Skip to content

Commit

Permalink
Revert "Nexus: Delete state machine on terminal state -- Part 3 (#6984)…
Browse files Browse the repository at this point in the history
…" (#7024)

This reverts commit d89f514.

## Why?

It causes issues in the replication stack as indicated by our nightly
pipelines.
  • Loading branch information
bergundy authored Dec 20, 2024
1 parent 4c3e397 commit 5ff0650
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 501 deletions.
53 changes: 11 additions & 42 deletions components/nexusoperations/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (d ScheduledEventDefinition) Apply(root *hsm.Node, event *historypb.History
if err != nil {
return err
}
_, err = AddChild(root, strconv.FormatInt(event.EventId, 10), event, token)
_, err = AddChild(root, strconv.FormatInt(event.EventId, 10), event, token, true)
return err
}

Expand All @@ -67,11 +67,9 @@ func (d CancelRequestedEventDefinition) Type() enumspb.EventType {
}

func (d CancelRequestedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
_, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return o.Cancel(node, event.EventTime.AsTime())
})

return err
}

func (d CancelRequestedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand All @@ -90,15 +88,13 @@ func (d StartedEventDefinition) Type() enumspb.EventType {
}

func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
_, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionStarted.Apply(o, EventStarted{
Time: event.EventTime.AsTime(),
Node: node,
Attributes: event.GetNexusOperationStartedEventAttributes(),
})
})

return err
}

func (d StartedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand All @@ -115,17 +111,12 @@ func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool {
}

func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
node, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionSucceeded.Apply(o, EventSucceeded{
Time: event.EventTime.AsTime(),
Node: node,
})
})
if err != nil {
return err
}

return root.DeleteChild(node.Key)
}

func (d CompletedEventDefinition) Type() enumspb.EventType {
Expand All @@ -150,18 +141,13 @@ func (d FailedEventDefinition) Type() enumspb.EventType {
}

func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
node, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionFailed.Apply(o, EventFailed{
Time: event.EventTime.AsTime(),
Attributes: event.GetNexusOperationFailedEventAttributes(),
Node: node,
})
})
if err != nil {
return err
}

return root.DeleteChild(node.Key)
}

func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand All @@ -182,17 +168,12 @@ func (d CanceledEventDefinition) Type() enumspb.EventType {
}

func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
node, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionCanceled.Apply(o, EventCanceled{
Time: event.EventTime.AsTime(),
Node: node,
})
})
if err != nil {
return err
}

return root.DeleteChild(node.Key)
}

func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand All @@ -213,16 +194,11 @@ func (d TimedOutEventDefinition) Type() enumspb.EventType {
}

func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
node, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return TransitionTimedOut.Apply(o, EventTimedOut{
Node: node,
})
})
if err != nil {
return err
}

return root.DeleteChild(node.Key)
}

func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error {
Expand Down Expand Up @@ -254,21 +230,14 @@ func RegisterEventDefinitions(reg *hsm.Registry) error {
return reg.RegisterEventDefinition(TimedOutEventDefinition{})
}

func transitionOperation(
root *hsm.Node,
event *historypb.HistoryEvent,
fn func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error),
) (*hsm.Node, error) {
func transitionOperation(root *hsm.Node, event *historypb.HistoryEvent, fn func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error)) error {
node, err := findOperationNode(root, event)
if err != nil {
return nil, err
return err
}
if err := hsm.MachineTransition(node, func(o Operation) (hsm.TransitionOutput, error) {
return hsm.MachineTransition(node, func(o Operation) (hsm.TransitionOutput, error) {
return fn(node, o)
}); err != nil {
return nil, err
}
return node, nil
})
}

func findOperationNode(root *hsm.Node, event *historypb.HistoryEvent) (*hsm.Node, error) {
Expand Down
97 changes: 0 additions & 97 deletions components/nexusoperations/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
package nexusoperations_test

import (
"strconv"
"testing"
"time"

Expand All @@ -33,7 +32,6 @@ import (
"go.temporal.io/server/components/nexusoperations"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/hsm/hsmtest"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestCherryPick(t *testing.T) {
Expand Down Expand Up @@ -142,98 +140,3 @@ func TestCherryPick(t *testing.T) {
}
})
}

func TestTerminalStatesDeletion(t *testing.T) {
testCases := []struct {
name string
def hsm.EventDefinition
attributes interface{}
}{
{
name: "CompletedDeletesStateMachine",
def: nexusoperations.CompletedEventDefinition{},
attributes: &historypb.NexusOperationCompletedEventAttributes{
ScheduledEventId: 0,
},
},
{
name: "FailedDeletesStateMachine",
def: nexusoperations.FailedEventDefinition{},
attributes: &historypb.NexusOperationFailedEventAttributes{
ScheduledEventId: 0,
},
},
{
name: "CanceledDeletesStateMachine",
def: nexusoperations.CanceledEventDefinition{},
attributes: &historypb.NexusOperationCanceledEventAttributes{
ScheduledEventId: 0,
},
},
{
name: "TimedOutDeletesStateMachine",
def: nexusoperations.TimedOutEventDefinition{},
attributes: &historypb.NexusOperationTimedOutEventAttributes{
ScheduledEventId: 0,
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Hour))
op, err := hsm.MachineData[nexusoperations.Operation](node)
require.NoError(t, err)
eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken)
require.NoError(t, err)

// Update the event ID in attributes
switch a := tc.attributes.(type) {
case *historypb.NexusOperationCompletedEventAttributes:
a.ScheduledEventId = eventID
case *historypb.NexusOperationFailedEventAttributes:
a.ScheduledEventId = eventID
case *historypb.NexusOperationCanceledEventAttributes:
a.ScheduledEventId = eventID
case *historypb.NexusOperationTimedOutEventAttributes:
a.ScheduledEventId = eventID
}

event := &historypb.HistoryEvent{
EventTime: timestamppb.Now(),
}

switch d := tc.def.(type) {
case nexusoperations.CompletedEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationCompletedEventAttributes{
NexusOperationCompletedEventAttributes: tc.attributes.(*historypb.NexusOperationCompletedEventAttributes),
}
case nexusoperations.FailedEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationFailedEventAttributes{
NexusOperationFailedEventAttributes: tc.attributes.(*historypb.NexusOperationFailedEventAttributes),
}
case nexusoperations.CanceledEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationCanceledEventAttributes{
NexusOperationCanceledEventAttributes: tc.attributes.(*historypb.NexusOperationCanceledEventAttributes),
}
case nexusoperations.TimedOutEventDefinition:
event.EventType = d.Type()
event.Attributes = &historypb.HistoryEvent_NexusOperationTimedOutEventAttributes{
NexusOperationTimedOutEventAttributes: tc.attributes.(*historypb.NexusOperationTimedOutEventAttributes),
}
default:
t.Fatalf("unknown event definition type: %T", tc.def)
}

err = tc.def.Apply(node.Parent, event)
require.NoError(t, err)

coll := nexusoperations.MachineCollection(node.Parent)
_, err = coll.Node(strconv.FormatInt(eventID, 10))
require.ErrorIs(t, err, hsm.ErrStateMachineNotFound)
})
}
}
2 changes: 1 addition & 1 deletion components/nexusoperations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newOperationNode(t *testing.T, backend *hsmtest.NodeBackend, event *history
root := newRoot(t, backend)
token, err := hsm.GenerateEventLoadToken(event)
require.NoError(t, err)
node, err := nexusoperations.AddChild(root, fmt.Sprintf("%d", event.EventId), event, token)
node, err := nexusoperations.AddChild(root, fmt.Sprintf("%d", event.EventId), event, token, false)
require.NoError(t, err)
return node
}
Expand Down
6 changes: 4 additions & 2 deletions components/nexusoperations/statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Operation struct {
}

// AddChild adds a new operation child machine to the given node and transitions it to the SCHEDULED state.
func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte) (*hsm.Node, error) {
func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte, deleteOnCompletion bool) (*hsm.Node, error) {
attrs := event.GetNexusOperationScheduledEventAttributes()

node, err := node.AddChild(hsm.Key{Type: OperationMachineType, ID: id}, Operation{
Expand All @@ -78,7 +78,9 @@ func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventTok
ScheduleToCloseTimeout: attrs.ScheduleToCloseTimeout,
RequestId: attrs.RequestId,
State: enumsspb.NEXUS_OPERATION_STATE_UNSPECIFIED,
ScheduledEventToken: eventToken,
// TODO(bergundy): actually delete on completion if this is set.
DeleteOnCompletion: deleteOnCompletion,
ScheduledEventToken: eventToken,
},
})

Expand Down
2 changes: 1 addition & 1 deletion components/nexusoperations/statemachine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestAddChild(t *testing.T) {
},
},
}
child, err := nexusoperations.AddChild(root, "test-id", event, []byte("token"))
child, err := nexusoperations.AddChild(root, "test-id", event, []byte("token"), false)
require.NoError(t, err)
opLog, err := root.Outputs()
require.NoError(t, err)
Expand Down
41 changes: 24 additions & 17 deletions components/nexusoperations/workflow/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,27 +209,37 @@ func (ch *commandHandler) HandleCancelCommand(
Message: "empty CancelNexusOperationCommandAttributes",
}
}

coll := nexusoperations.MachineCollection(ms.HSM())
nodeID := strconv.FormatInt(attrs.ScheduledEventId, 10)
_, err := coll.Node(nodeID)
node, err := coll.Node(nodeID)
if err != nil {
if errors.Is(err, hsm.ErrStateMachineNotFound) {
if !ms.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId)) {
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("requested cancelation for a non-existing or already completed operation with scheduled event ID of %d", attrs.ScheduledEventId),
}
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("requested cancelation for a non-existing operation with scheduled event ID of %d", attrs.ScheduledEventId),
// TODO(bergundy): Message: fmt.Sprintf("requested cancelation for a non-existing or already completed operation with scheduled event ID of %d", attrs.ScheduledEventId),
}
// Fallthrough and apply the event, there's special logic that will handle state machine not found below.
} else {
return err
}
return err
}
// TODO(bergundy): Remove this when operation auto-deletes itself on terminal state.
// Operation may already be in a terminal state because it doesn't yet delete itself. We don't want to accept
// cancelation in this case.
op, err := hsm.MachineData[nexusoperations.Operation](node)
if err != nil {
return err
}
// The operation is already in a terminal state and the terminal NexusOperation event has not just been buffered.
// We allow the workflow to request canceling an operation that has just completed while a workflow task is in
// flight since it cannot know about the state of the operation.
// TODO(bergundy): When we support state machine deletion, this condition will have to change.
if !nexusoperations.TransitionCanceled.Possible(op) && !ms.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId)) {
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("requested cancelation for an already complete operation with scheduled event ID of %d", attrs.ScheduledEventId),
}
}

// Always create the event even if there's a buffered completion to avoid breaking replay in the SDK.
// The event will be applied before the completion since buffered events are reordered and put at the end of the
// batch, after command events from the workflow task.
event := ms.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED, func(he *historypb.HistoryEvent) {
he.Attributes = &historypb.HistoryEvent_NexusOperationCancelRequestedEventAttributes{
NexusOperationCancelRequestedEventAttributes: &historypb.NexusOperationCancelRequestedEventAttributes{
Expand All @@ -246,11 +256,8 @@ func (ch *commandHandler) HandleCancelCommand(
if errors.Is(err, hsm.ErrStateMachineAlreadyExists) {
return workflow.FailWorkflowTaskError{
Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES,
Message: fmt.Sprintf("cancelation was already requested for an operation with scheduled event ID %d", attrs.ScheduledEventId),
Message: fmt.Sprintf("cancelation was already requested for an operation with scheduled event ID of %d", attrs.ScheduledEventId),
}
} else if errors.Is(err, hsm.ErrStateMachineNotFound) {
// This may happen if there's a buffered completion. Ignore.
return nil
}

return err
Expand Down
Loading

0 comments on commit 5ff0650

Please sign in to comment.