From 6bcd47751bab279ae8c0def935db6f2d6eb5d404 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Thu, 30 Apr 2020 12:45:16 -0700 Subject: [PATCH] Rename Payload to Payloads (#115) * Rename Payload to Payloads. * Update go.temporal.io/temporal-proto. --- client/client.go | 4 +- go.mod | 2 +- go.sum | 6 +- internal/activity.go | 4 +- internal/client.go | 4 +- internal/encoded.go | 30 ++++----- internal/encoded_test.go | 18 +++--- internal/error_test.go | 6 +- internal/headers.go | 10 +-- internal/headers_test.go | 28 ++++---- internal/internal_activity.go | 14 ++-- internal/internal_decision_state_machine.go | 8 +-- .../internal_decision_state_machine_test.go | 4 +- internal/internal_event_handlers.go | 64 +++++++++---------- internal/internal_event_handlers_test.go | 10 +-- internal/internal_task_handlers.go | 18 +++--- internal/internal_task_handlers_test.go | 14 ++-- internal/internal_task_pollers.go | 8 +-- internal/internal_utils.go | 10 +-- internal/internal_worker.go | 18 +++--- internal/internal_worker_base.go | 16 ++--- internal/internal_worker_interfaces_test.go | 2 +- internal/internal_worker_test.go | 4 +- internal/internal_workers_test.go | 2 +- internal/internal_workflow.go | 34 +++++----- internal/internal_workflow_client.go | 12 ++-- internal/internal_workflow_client_test.go | 4 +- internal/internal_workflow_testsuite.go | 57 +++++++++-------- internal/internal_workflow_testsuite_test.go | 28 ++++---- internal/session.go | 2 +- internal/tracer.go | 2 +- internal/tracer_test.go | 8 +-- internal/workflow.go | 24 +++---- internal/workflow_testsuite.go | 6 +- 34 files changed, 240 insertions(+), 241 deletions(-) diff --git a/client/client.go b/client/client.go index b225854cb..7b918725c 100644 --- a/client/client.go +++ b/client/client.go @@ -436,7 +436,7 @@ var _ internal.NamespaceClient = NamespaceClient(nil) // which can be decoded by using: // var result string // This need to be same type as the one passed to RecordHeartbeat // NewValue(data).Get(&result) -func NewValue(data *commonpb.Payload) encoded.Value { +func NewValue(data *commonpb.Payloads) encoded.Value { return internal.NewValue(data) } @@ -447,6 +447,6 @@ func NewValue(data *commonpb.Payload) encoded.Value { // var result1 string // var result2 int // These need to be same type as those arguments passed to RecordHeartbeat // NewValues(data).Get(&result1, &result2) -func NewValues(data *commonpb.Payload) encoded.Values { +func NewValues(data *commonpb.Payloads) encoded.Values { return internal.NewValues(data) } diff --git a/go.mod b/go.mod index 0c2a176ea..a289b18d0 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/uber-go/tally v3.3.15+incompatible github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible // indirect - go.temporal.io/temporal-proto v0.20.29 + go.temporal.io/temporal-proto v0.20.30 go.uber.org/atomic v1.6.0 go.uber.org/goleak v1.0.0 go.uber.org/zap v1.14.1 diff --git a/go.sum b/go.sum index b71856fc4..f8f3f1327 100644 --- a/go.sum +++ b/go.sum @@ -93,10 +93,8 @@ github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.temporal.io/temporal-proto v0.20.28 h1:syCkMj1bBEqPCj4dKmPQksBKH3qXeZ+PayfsBMdQEOY= -go.temporal.io/temporal-proto v0.20.28/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s= -go.temporal.io/temporal-proto v0.20.29 h1:YDKcU0qxThs9ihny93Pf2/gSdGvybKjKuCIosy54nQ8= -go.temporal.io/temporal-proto v0.20.29/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s= +go.temporal.io/temporal-proto v0.20.30 h1:QxvCfTZ1U686bmlMPTTg0F/dvMvt02m7i3jF3zWEX/E= +go.temporal.io/temporal-proto v0.20.30/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= diff --git a/internal/activity.go b/internal/activity.go index 40be2c4d9..32fb32ee3 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -205,7 +205,7 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) { // no-op for local activity return } - var data *commonpb.Payload + var data *commonpb.Payloads var err error // We would like to be a able to pass in "nil" as part of details(that is no progress to report to) if len(details) > 1 || (len(details) == 1 && details[0] != nil) { @@ -226,7 +226,7 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) { // Implement to unit test activities. type ServiceInvoker interface { // Returns ActivityTaskCanceledError if activity is cancelled - Heartbeat(details *commonpb.Payload) error + Heartbeat(details *commonpb.Payloads) error Close(flushBufferedHeartbeat bool) GetClient(namespace string, options ClientOptions) Client } diff --git a/internal/client.go b/internal/client.go index b597f1985..7810e8d34 100644 --- a/internal/client.go +++ b/internal/client.go @@ -684,7 +684,7 @@ func (p ParentClosePolicy) toProto() commonpb.ParentClosePolicy { // which can be decoded by using: // var result string // This need to be same type as the one passed to RecordHeartbeat // NewValue(data).Get(&result) -func NewValue(data *commonpb.Payload) Value { +func NewValue(data *commonpb.Payloads) Value { return newEncodedValue(data, nil) } @@ -695,6 +695,6 @@ func NewValue(data *commonpb.Payload) Value { // var result1 string // var result2 int // These need to be same type as those arguments passed to RecordHeartbeat // NewValues(data).Get(&result1, &result2) -func NewValues(data *commonpb.Payload) Values { +func NewValues(data *commonpb.Payloads) Values { return newEncodedValues(data, nil) } diff --git a/internal/encoded.go b/internal/encoded.go index 18b482aef..34eb941a7 100644 --- a/internal/encoded.go +++ b/internal/encoded.go @@ -68,10 +68,10 @@ type ( // 2. Activity/Workflow worker that run these activity/childWorkflow, through cleint.Options. DataConverter interface { // ToData implements conversion of a list of values. - ToData(value ...interface{}) (*commonpb.Payload, error) + ToData(value ...interface{}) (*commonpb.Payloads, error) // FromData implements conversion of an array of values of different types. // Useful for deserializing arguments of function invocations. - FromData(input *commonpb.Payload, valuePtr ...interface{}) error + FromData(input *commonpb.Payloads, valuePtr ...interface{}) error } // defaultDataConverter uses JSON. @@ -107,12 +107,12 @@ func getDefaultDataConverter() DataConverter { return DefaultDataConverter } -func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payload, error) { +func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payloads, error) { if len(values) == 0 { return nil, nil } - payload := &commonpb.Payload{} + result := &commonpb.Payloads{} for i, value := range values { nvp, ok := value.(NameValuePair) if !ok { @@ -120,9 +120,9 @@ func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payload nvp.Value = value } - var payloadItem *commonpb.PayloadItem + var payload *commonpb.Payload if bytes, isByteSlice := nvp.Value.([]byte); isByteSlice { - payloadItem = &commonpb.PayloadItem{ + payload = &commonpb.Payload{ Metadata: map[string][]byte{ metadataEncoding: []byte(metadataEncodingRaw), metadataName: []byte(nvp.Name), @@ -134,7 +134,7 @@ func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payload if err != nil { return nil, fmt.Errorf("%s: %w: %v", nvp.Name, ErrUnableToEncodeJSON, err) } - payloadItem = &commonpb.PayloadItem{ + payload = &commonpb.Payload{ Metadata: map[string][]byte{ metadataEncoding: []byte(metadataEncodingJSON), metadataName: []byte(nvp.Name), @@ -142,23 +142,23 @@ func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payload Data: data, } } - payload.Items = append(payload.Items, payloadItem) + result.Payloads = append(result.Payloads, payload) } - return payload, nil + return result, nil } -func (dc *defaultDataConverter) FromData(payload *commonpb.Payload, valuePtrs ...interface{}) error { - if payload == nil { +func (dc *defaultDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { + if payloads == nil { return nil } - for i, payloadItem := range payload.GetItems() { + for i, payload := range payloads.GetPayloads() { if i >= len(valuePtrs) { break } - metadata := payloadItem.GetMetadata() + metadata := payload.GetMetadata() if metadata == nil { return fmt.Errorf("payload item %d: %w", i, ErrMetadataIsNotSet) } @@ -183,9 +183,9 @@ func (dc *defaultDataConverter) FromData(payload *commonpb.Payload, valuePtrs .. if !valueBytes.CanSet() { return fmt.Errorf("%s: %w", name, ErrUnableToSetBytes) } - valueBytes.SetBytes(payloadItem.GetData()) + valueBytes.SetBytes(payload.GetData()) case metadataEncodingJSON: - err := json.Unmarshal(payloadItem.GetData(), valuePtrs[i]) + err := json.Unmarshal(payload.GetData(), valuePtrs[i]) if err != nil { return fmt.Errorf("%s: %w: %v", name, ErrUnableToDecodeJSON, err) } diff --git a/internal/encoded_test.go b/internal/encoded_test.go index b4666e4db..e38fbf009 100644 --- a/internal/encoded_test.go +++ b/internal/encoded_test.go @@ -104,8 +104,8 @@ func newTestDataConverter() DataConverter { return &testDataConverter{} } -func (dc *testDataConverter) ToData(values ...interface{}) (*commonpb.Payload, error) { - payload := &commonpb.Payload{} +func (dc *testDataConverter) ToData(values ...interface{}) (*commonpb.Payloads, error) { + result := &commonpb.Payloads{} for i, arg := range values { var buf bytes.Buffer @@ -114,22 +114,22 @@ func (dc *testDataConverter) ToData(values ...interface{}) (*commonpb.Payload, e return nil, fmt.Errorf("values[%d]: %w: %v", i, ErrUnableToEncodeGob, err) } - payloadItem := &commonpb.PayloadItem{ + payload := &commonpb.Payload{ Metadata: map[string][]byte{ metadataEncoding: []byte(metadataEncodingGob), metadataName: []byte(fmt.Sprintf("args[%d]", i)), }, Data: buf.Bytes(), } - payload.Items = append(payload.Items, payloadItem) + result.Payloads = append(result.Payloads, payload) } - return payload, nil + return result, nil } -func (dc *testDataConverter) FromData(payload *commonpb.Payload, valuePtrs ...interface{}) error { - for i, payloadItem := range payload.GetItems() { - encoding, ok := payloadItem.GetMetadata()[metadataEncoding] +func (dc *testDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { + for i, payload := range payloads.GetPayloads() { + encoding, ok := payload.GetMetadata()[metadataEncoding] if !ok { return fmt.Errorf("args[%d]: %w", i, ErrEncodingIsNotSet) @@ -137,7 +137,7 @@ func (dc *testDataConverter) FromData(payload *commonpb.Payload, valuePtrs ...in e := string(encoding) if e == metadataEncodingGob { - dec := gob.NewDecoder(bytes.NewBuffer(payloadItem.GetData())) + dec := gob.NewDecoder(bytes.NewBuffer(payload.GetData())) if err := dec.Decode(valuePtrs[i]); err != nil { return fmt.Errorf("args[%d]: %w: %v", i, ErrUnableToDecodeGob, err) } diff --git a/internal/error_test.go b/internal/error_test.go index a00f5b13a..ba9d6430a 100644 --- a/internal/error_test.go +++ b/internal/error_test.go @@ -128,7 +128,7 @@ func testTimeoutErrorDetails(t *testing.T, timeoutType eventpb.TimeoutType) { &decisionpb.ScheduleActivityTaskDecisionAttributes{ActivityId: activityID}) di.state = decisionStateInitiated di.setData(&scheduledActivity{ - callback: func(r *commonpb.Payload, e error) { + callback: func(r *commonpb.Payloads, e error) { actualErr = e }, }) @@ -414,7 +414,7 @@ func Test_SignalExternalWorkflowExecutionFailedError(t *testing.T) { ) di.state = decisionStateInitiated di.setData(&scheduledSignal{ - callback: func(r *commonpb.Payload, e error) { + callback: func(r *commonpb.Payloads, e error) { actualErr = e }, }) @@ -441,7 +441,7 @@ func Test_ContinueAsNewError(t *testing.T) { headerValue, err := DefaultDataConverter.ToData("test-data") assert.NoError(t, err) header := &commonpb.Header{ - Fields: map[string]*commonpb.Payload{"test": headerValue}, + Fields: map[string]*commonpb.Payloads{"test": headerValue}, } s := &WorkflowTestSuite{ diff --git a/internal/headers.go b/internal/headers.go index 0bb6fb478..189e605b7 100644 --- a/internal/headers.go +++ b/internal/headers.go @@ -32,12 +32,12 @@ import ( // HeaderWriter is an interface to write information to temporal headers type HeaderWriter interface { - Set(string, *commonpb.Payload) + Set(string, *commonpb.Payloads) } // HeaderReader is an interface to read information from temporal headers type HeaderReader interface { - ForEachKey(handler func(string, *commonpb.Payload) error) error + ForEachKey(handler func(string, *commonpb.Payloads) error) error } // ContextPropagator is an interface that determines what information from @@ -62,7 +62,7 @@ type headerReader struct { header *commonpb.Header } -func (hr *headerReader) ForEachKey(handler func(string, *commonpb.Payload) error) error { +func (hr *headerReader) ForEachKey(handler func(string, *commonpb.Payloads) error) error { if hr.header == nil { return nil } @@ -83,7 +83,7 @@ type headerWriter struct { header *commonpb.Header } -func (hw *headerWriter) Set(key string, value *commonpb.Payload) { +func (hw *headerWriter) Set(key string, value *commonpb.Payloads) { if hw.header == nil { return } @@ -93,7 +93,7 @@ func (hw *headerWriter) Set(key string, value *commonpb.Payload) { // NewHeaderWriter returns a header writer interface func NewHeaderWriter(header *commonpb.Header) HeaderWriter { if header != nil && header.Fields == nil { - header.Fields = make(map[string]*commonpb.Payload) + header.Fields = make(map[string]*commonpb.Payloads) } return &headerWriter{header} } diff --git a/internal/headers_test.go b/internal/headers_test.go index b371fedcd..eda74b942 100644 --- a/internal/headers_test.go +++ b/internal/headers_test.go @@ -37,30 +37,30 @@ func TestHeaderWriter(t *testing.T) { name string initial *commonpb.Header expected *commonpb.Header - vals map[string]*commonpb.Payload + vals map[string]*commonpb.Payloads }{ { "no values", &commonpb.Header{ - Fields: map[string]*commonpb.Payload{}, + Fields: map[string]*commonpb.Payloads{}, }, &commonpb.Header{ - Fields: map[string]*commonpb.Payload{}, + Fields: map[string]*commonpb.Payloads{}, }, - map[string]*commonpb.Payload{}, + map[string]*commonpb.Payloads{}, }, { "add values", &commonpb.Header{ - Fields: map[string]*commonpb.Payload{}, + Fields: map[string]*commonpb.Payloads{}, }, &commonpb.Header{ - Fields: map[string]*commonpb.Payload{ + Fields: map[string]*commonpb.Payloads{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, }, - map[string]*commonpb.Payload{ + map[string]*commonpb.Payloads{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, @@ -68,17 +68,17 @@ func TestHeaderWriter(t *testing.T) { { "overwrite values", &commonpb.Header{ - Fields: map[string]*commonpb.Payload{ + Fields: map[string]*commonpb.Payloads{ "key1": encodeString(t, "unexpected"), }, }, &commonpb.Header{ - Fields: map[string]*commonpb.Payload{ + Fields: map[string]*commonpb.Payloads{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, }, - map[string]*commonpb.Payload{ + map[string]*commonpb.Payloads{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, @@ -98,7 +98,7 @@ func TestHeaderWriter(t *testing.T) { } } -func encodeString(t *testing.T, s string) *commonpb.Payload { +func encodeString(t *testing.T, s string) *commonpb.Payloads { p, err := DefaultDataConverter.ToData(s) assert.NoError(t, err) return p @@ -115,7 +115,7 @@ func TestHeaderReader(t *testing.T) { { "valid values", &commonpb.Header{ - Fields: map[string]*commonpb.Payload{ + Fields: map[string]*commonpb.Payloads{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, @@ -126,7 +126,7 @@ func TestHeaderReader(t *testing.T) { { "invalid values", &commonpb.Header{ - Fields: map[string]*commonpb.Payload{ + Fields: map[string]*commonpb.Payloads{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, @@ -141,7 +141,7 @@ func TestHeaderReader(t *testing.T) { t.Run(test.name, func(t *testing.T) { t.Parallel() reader := NewHeaderReader(test.header) - err := reader.ForEachKey(func(key string, _ *commonpb.Payload) error { + err := reader.ForEachKey(func(key string, _ *commonpb.Payloads) error { if _, ok := test.keys[key]; !ok { return assert.AnError } diff --git a/internal/internal_activity.go b/internal/internal_activity.go index dedd81d97..4240d8322 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -42,7 +42,7 @@ import ( type ( // activity is an interface of an activity implementation. activity interface { - Execute(ctx context.Context, input *commonpb.Payload) (*commonpb.Payload, error) + Execute(ctx context.Context, input *commonpb.Payloads) (*commonpb.Payloads, error) ActivityType() ActivityType GetFunction() interface{} } @@ -77,7 +77,7 @@ type ( executeActivityParams struct { activityOptions ActivityType ActivityType - Input *commonpb.Payload + Input *commonpb.Payloads DataConverter DataConverter Header *commonpb.Header } @@ -129,7 +129,7 @@ type ( taskList string dataConverter DataConverter attempt int32 // starts from 0. - heartbeatDetails *commonpb.Payload + heartbeatDetails *commonpb.Payloads workflowType *WorkflowType workflowNamespace string workerStopChannel <-chan struct{} @@ -268,7 +268,7 @@ func isActivityContext(inType reflect.Type) bool { return inType != nil && inType.Implements(contextElem) } -func validateFunctionAndGetResults(f interface{}, values []reflect.Value, dataConverter DataConverter) (*commonpb.Payload, error) { +func validateFunctionAndGetResults(f interface{}, values []reflect.Value, dataConverter DataConverter) (*commonpb.Payloads, error) { resultSize := len(values) if resultSize < 1 || resultSize > 2 { @@ -278,14 +278,14 @@ func validateFunctionAndGetResults(f interface{}, values []reflect.Value, dataCo fnName, resultSize) } - var result *commonpb.Payload + var result *commonpb.Payloads // Parse result if resultSize > 1 { retValue := values[0] var ok bool - if result, ok = retValue.Interface().(*commonpb.Payload); !ok { + if result, ok = retValue.Interface().(*commonpb.Payloads); !ok { if retValue.Kind() != reflect.Ptr || !retValue.IsNil() { var err error if result, err = encodeArg(dataConverter, retValue.Interface()); err != nil { @@ -309,7 +309,7 @@ func validateFunctionAndGetResults(f interface{}, values []reflect.Value, dataCo return result, errInterface } -func serializeResults(f interface{}, results []interface{}, dataConverter DataConverter) (result *commonpb.Payload, err error) { +func serializeResults(f interface{}, results []interface{}, dataConverter DataConverter) (result *commonpb.Payloads, err error) { // results contain all results including error resultSize := len(results) diff --git a/internal/internal_decision_state_machine.go b/internal/internal_decision_state_machine.go index edf8e5421..b77e93fa4 100644 --- a/internal/internal_decision_state_machine.go +++ b/internal/internal_decision_state_machine.go @@ -797,7 +797,7 @@ func (h *decisionsHelper) recordVersionMarker(changeID string, version Version, return decision } -func (h *decisionsHelper) recordSideEffectMarker(sideEffectID int32, data *commonpb.Payload) decisionStateMachine { +func (h *decisionsHelper) recordSideEffectMarker(sideEffectID int32, data *commonpb.Payloads) decisionStateMachine { markerID := fmt.Sprintf("%v_%v", sideEffectMarkerName, sideEffectID) attributes := &decisionpb.RecordMarkerDecisionAttributes{ MarkerName: sideEffectMarkerName, @@ -808,7 +808,7 @@ func (h *decisionsHelper) recordSideEffectMarker(sideEffectID int32, data *commo return decision } -func (h *decisionsHelper) recordLocalActivityMarker(activityID string, result *commonpb.Payload) decisionStateMachine { +func (h *decisionsHelper) recordLocalActivityMarker(activityID string, result *commonpb.Payloads) decisionStateMachine { markerID := fmt.Sprintf("%v_%v", localActivityMarkerName, activityID) attributes := &decisionpb.RecordMarkerDecisionAttributes{ MarkerName: localActivityMarkerName, @@ -819,7 +819,7 @@ func (h *decisionsHelper) recordLocalActivityMarker(activityID string, result *c return decision } -func (h *decisionsHelper) recordMutableSideEffectMarker(mutableSideEffectID string, data *commonpb.Payload) decisionStateMachine { +func (h *decisionsHelper) recordMutableSideEffectMarker(mutableSideEffectID string, data *commonpb.Payloads) decisionStateMachine { markerID := fmt.Sprintf("%v_%v", mutableSideEffectMarkerName, mutableSideEffectID) attributes := &decisionpb.RecordMarkerDecisionAttributes{ MarkerName: mutableSideEffectMarkerName, @@ -934,7 +934,7 @@ func (h *decisionsHelper) handleRequestCancelExternalWorkflowExecutionFailed(ini return isExternal, decision } -func (h *decisionsHelper) signalExternalWorkflowExecution(namespace, workflowID, runID, signalName string, input *commonpb.Payload, signalID string, childWorkflowOnly bool) decisionStateMachine { +func (h *decisionsHelper) signalExternalWorkflowExecution(namespace, workflowID, runID, signalName string, input *commonpb.Payloads, signalID string, childWorkflowOnly bool) decisionStateMachine { attributes := &decisionpb.SignalExternalWorkflowExecutionDecisionAttributes{ Namespace: namespace, Execution: &executionpb.WorkflowExecution{ diff --git a/internal/internal_decision_state_machine_test.go b/internal/internal_decision_state_machine_test.go index 964ec8558..7f778a8de 100644 --- a/internal/internal_decision_state_machine_test.go +++ b/internal/internal_decision_state_machine_test.go @@ -156,7 +156,7 @@ func Test_TimerCancelEventOrdering(t *testing.T) { require.Equal(t, attributes, decisions[0].GetStartTimerDecisionAttributes()) h.handleTimerStarted(timerID) require.Equal(t, decisionStateInitiated, d.getState()) - m := h.recordLocalActivityMarker(localActivityID, &commonpb.Payload{}) + m := h.recordLocalActivityMarker(localActivityID, &commonpb.Payloads{}) require.Equal(t, decisionStateCreated, m.getState()) h.cancelTimer(timerID) require.Equal(t, decisionStateCanceledAfterInitiated, d.getState()) @@ -508,7 +508,7 @@ func Test_MarkerStateMachine(t *testing.T) { h := newDecisionsHelper() // record marker for side effect - d := h.recordSideEffectMarker(1, &commonpb.Payload{}) + d := h.recordSideEffectMarker(1, &commonpb.Payloads{}) require.Equal(t, decisionStateCreated, d.getState()) // send decisions diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 6580bdb51..472986fdc 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -59,7 +59,7 @@ var _ workflowExecutionEventHandler = (*workflowExecutionEventHandlerImpl)(nil) type ( // completionHandler Handler to indicate completion result - completionHandler func(result *commonpb.Payload, err error) + completionHandler func(result *commonpb.Payloads, err error) // workflowExecutionEventHandlerImpl handler to handle workflowExecutionEventHandler workflowExecutionEventHandlerImpl struct { @@ -100,10 +100,10 @@ type ( workflowInfo *WorkflowInfo decisionsHelper *decisionsHelper - sideEffectResult map[int32]*commonpb.Payload + sideEffectResult map[int32]*commonpb.Payloads changeVersions map[string]Version pendingLaTasks map[string]*localActivityTask - mutableSideEffect map[string]*commonpb.Payload + mutableSideEffect map[string]*commonpb.Payloads unstartedLaTasks map[string]struct{} openSessions map[string]*SessionInfo @@ -111,10 +111,10 @@ type ( currentReplayTime time.Time // Indicates current replay time of the decision. currentLocalTime time.Time // Local time when currentReplayTime was updated. - completeHandler completionHandler // events completion handler - cancelHandler func() // A cancel handler to be invoked on a cancel notification - signalHandler func(name string, input *commonpb.Payload) // A signal handler to be invoked on a signal event - queryHandler func(queryType string, queryArgs *commonpb.Payload) (*commonpb.Payload, error) + completeHandler completionHandler // events completion handler + cancelHandler func() // A cancel handler to be invoked on a cancel notification + signalHandler func(name string, input *commonpb.Payloads) // A signal handler to be invoked on a signal event + queryHandler func(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) logger *zap.Logger isReplay bool // flag to indicate if workflow is in replay mode @@ -145,8 +145,8 @@ type ( ActivityID string ActivityType string ErrReason string - Err *commonpb.Payload - Result *commonpb.Payload + Err *commonpb.Payloads + Result *commonpb.Payloads ReplayTime time.Time Attempt int32 // record attempt, starting from 0. Backoff time.Duration // retry backoff duration. @@ -192,8 +192,8 @@ func newWorkflowExecutionEventHandler( context := &workflowEnvironmentImpl{ workflowInfo: workflowInfo, decisionsHelper: newDecisionsHelper(), - sideEffectResult: make(map[int32]*commonpb.Payload), - mutableSideEffect: make(map[string]*commonpb.Payload), + sideEffectResult: make(map[int32]*commonpb.Payloads), + mutableSideEffect: make(map[string]*commonpb.Payloads), changeVersions: make(map[string]Version), pendingLaTasks: make(map[string]*localActivityTask), unstartedLaTasks: make(map[string]struct{}), @@ -219,7 +219,7 @@ func newWorkflowExecutionEventHandler( return &workflowExecutionEventHandlerImpl{context, nil} } -func (s *scheduledTimer) handle(result *commonpb.Payload, err error) { +func (s *scheduledTimer) handle(result *commonpb.Payloads, err error) { if s.handled { panic(fmt.Sprintf("timer already handled %v", s)) } @@ -227,7 +227,7 @@ func (s *scheduledTimer) handle(result *commonpb.Payload, err error) { s.callback(result, err) } -func (s *scheduledActivity) handle(result *commonpb.Payload, err error) { +func (s *scheduledActivity) handle(result *commonpb.Payloads, err error) { if s.handled { panic(fmt.Sprintf("activity already handled %v", s)) } @@ -235,7 +235,7 @@ func (s *scheduledActivity) handle(result *commonpb.Payload, err error) { s.callback(result, err) } -func (s *scheduledChildWorkflow) handle(result *commonpb.Payload, err error) { +func (s *scheduledChildWorkflow) handle(result *commonpb.Payloads, err error) { if s.handled { panic(fmt.Sprintf("child workflow already handled %v", s)) } @@ -252,7 +252,7 @@ func (t *localActivityTask) cancel() { t.Unlock() } -func (s *scheduledCancellation) handle(result *commonpb.Payload, err error) { +func (s *scheduledCancellation) handle(result *commonpb.Payloads, err error) { if s.handled { panic(fmt.Sprintf("cancellation already handled %v", s)) } @@ -260,7 +260,7 @@ func (s *scheduledCancellation) handle(result *commonpb.Payload, err error) { s.callback(result, err) } -func (s *scheduledSignal) handle(result *commonpb.Payload, err error) { +func (s *scheduledSignal) handle(result *commonpb.Payloads, err error) { if s.handled { panic(fmt.Sprintf("signal already handled %v", s)) } @@ -272,7 +272,7 @@ func (wc *workflowEnvironmentImpl) WorkflowInfo() *WorkflowInfo { return wc.workflowInfo } -func (wc *workflowEnvironmentImpl) Complete(result *commonpb.Payload, err error) { +func (wc *workflowEnvironmentImpl) Complete(result *commonpb.Payloads, err error) { wc.completeHandler(result, err) } @@ -289,7 +289,7 @@ func (wc *workflowEnvironmentImpl) RequestCancelExternalWorkflow(namespace, work } func (wc *workflowEnvironmentImpl) SignalExternalWorkflow(namespace, workflowID, runID, signalName string, - input *commonpb.Payload, _ /* THIS IS FOR TEST FRAMEWORK. DO NOT USE HERE. */ interface{}, childWorkflowOnly bool, callback resultHandler) { + input *commonpb.Payloads, _ /* THIS IS FOR TEST FRAMEWORK. DO NOT USE HERE. */ interface{}, childWorkflowOnly bool, callback resultHandler) { signalID := wc.GenerateSequenceID() decision := wc.decisionsHelper.signalExternalWorkflowExecution(namespace, workflowID, runID, signalName, input, signalID, childWorkflowOnly) @@ -326,7 +326,7 @@ func mergeSearchAttributes(current, upsert *commonpb.SearchAttributes) *commonpb return nil } current = &commonpb.SearchAttributes{ - IndexedFields: make(map[string]*commonpb.Payload), + IndexedFields: make(map[string]*commonpb.Payloads), } } @@ -400,11 +400,11 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow( return nil } -func (wc *workflowEnvironmentImpl) RegisterSignalHandler(handler func(name string, input *commonpb.Payload)) { +func (wc *workflowEnvironmentImpl) RegisterSignalHandler(handler func(name string, input *commonpb.Payloads)) { wc.signalHandler = handler } -func (wc *workflowEnvironmentImpl) RegisterQueryHandler(handler func(string, *commonpb.Payload) (*commonpb.Payload, error)) { +func (wc *workflowEnvironmentImpl) RegisterQueryHandler(handler func(string, *commonpb.Payloads) (*commonpb.Payloads, error)) { wc.queryHandler = handler } @@ -620,10 +620,10 @@ func getChangeVersion(changeID string, version Version) string { return fmt.Sprintf("%s-%v", changeID, version) } -func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payload, error), callback resultHandler) { +func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback resultHandler) { sideEffectID := wc.GenerateSequence() - var details *commonpb.Payload - var result *commonpb.Payload + var details *commonpb.Payloads + var result *commonpb.Payloads if wc.isReplay { var ok bool result, ok = wc.sideEffectResult[sideEffectID] @@ -681,7 +681,7 @@ func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interfa return wc.recordMutableSideEffect(id, wc.encodeValue(f())) } -func (wc *workflowEnvironmentImpl) isEqualValue(newValue interface{}, encodedOldValue *commonpb.Payload, equals func(a, b interface{}) bool) bool { +func (wc *workflowEnvironmentImpl) isEqualValue(newValue interface{}, encodedOldValue *commonpb.Payloads, equals func(a, b interface{}) bool) bool { if newValue == nil { // new value is nil newEncodedValue := wc.encodeValue(nil) @@ -702,7 +702,7 @@ func decodeValue(encodedValue Value, value interface{}) interface{} { return decodedValue } -func (wc *workflowEnvironmentImpl) encodeValue(value interface{}) *commonpb.Payload { +func (wc *workflowEnvironmentImpl) encodeValue(value interface{}) *commonpb.Payloads { payload, err := wc.encodeArg(value) if err != nil { panic(err) @@ -710,11 +710,11 @@ func (wc *workflowEnvironmentImpl) encodeValue(value interface{}) *commonpb.Payl return payload } -func (wc *workflowEnvironmentImpl) encodeArg(arg interface{}) (*commonpb.Payload, error) { +func (wc *workflowEnvironmentImpl) encodeArg(arg interface{}) (*commonpb.Payloads, error) { return wc.GetDataConverter().ToData(arg) } -func (wc *workflowEnvironmentImpl) recordMutableSideEffect(id string, data *commonpb.Payload) Value { +func (wc *workflowEnvironmentImpl) recordMutableSideEffect(id string, data *commonpb.Payloads) Value { details, err := encodeArgs(wc.GetDataConverter(), []interface{}{id, data}) if err != nil { panic(err) @@ -917,7 +917,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( return nil } -func (weh *workflowExecutionEventHandlerImpl) ProcessQuery(queryType string, queryArgs *commonpb.Payload) (*commonpb.Payload, error) { +func (weh *workflowExecutionEventHandlerImpl) ProcessQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) { switch queryType { case QueryTypeStackTrace: return weh.encodeArg(weh.StackTrace()) @@ -1055,7 +1055,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded( switch attributes.GetMarkerName() { case sideEffectMarkerName: var sideEffectID int32 - var result *commonpb.Payload + var result *commonpb.Payloads _ = encodedValues.Get(&sideEffectID, result) weh.sideEffectResult[sideEffectID] = result return nil @@ -1069,7 +1069,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded( return weh.handleLocalActivityMarker(attributes.Details) case mutableSideEffectMarkerName: var fixedID string - var result *commonpb.Payload + var result *commonpb.Payloads _ = encodedValues.Get(&fixedID, result) weh.mutableSideEffect[fixedID] = result return nil @@ -1079,7 +1079,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded( } } -func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(markerData *commonpb.Payload) error { +func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(markerData *commonpb.Payloads) error { lamd := localActivityMarkerData{} if err := weh.dataConverter.FromData(markerData, &lamd); err != nil { diff --git a/internal/internal_event_handlers_test.go b/internal/internal_event_handlers_test.go index d552de192..08f74d6a1 100644 --- a/internal/internal_event_handlers_test.go +++ b/internal/internal_event_handlers_test.go @@ -189,7 +189,7 @@ func Test_UpsertSearchAttributes(t *testing.T) { func Test_MergeSearchAttributes(t *testing.T) { t.Parallel() - encodeString := func(str string) *commonpb.Payload { + encodeString := func(str string) *commonpb.Payloads { payload, _ := DefaultDataConverter.ToData(str) return payload } @@ -208,26 +208,26 @@ func Test_MergeSearchAttributes(t *testing.T) { }, { name: "currentIsEmpty", - current: &commonpb.SearchAttributes{IndexedFields: make(map[string]*commonpb.Payload)}, + current: &commonpb.SearchAttributes{IndexedFields: make(map[string]*commonpb.Payloads)}, upsert: &commonpb.SearchAttributes{}, expected: nil, }, { name: "normalMerge", current: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payload{ + IndexedFields: map[string]*commonpb.Payloads{ "CustomIntField": encodeString(`1`), "CustomKeywordField": encodeString(`keyword`), }, }, upsert: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payload{ + IndexedFields: map[string]*commonpb.Payloads{ "CustomIntField": encodeString(`2`), "CustomBoolField": encodeString(`true`), }, }, expected: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payload{ + IndexedFields: map[string]*commonpb.Payloads{ "CustomIntField": encodeString(`2`), "CustomKeywordField": encodeString(`keyword`), "CustomBoolField": encodeString(`true`), diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 031a9c1d5..16ea4b00b 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -70,7 +70,7 @@ type ( // Return List of decisions made, any error. ProcessEvent(event *eventpb.HistoryEvent, isReplay bool, isLast bool) error // ProcessQuery process a query request. - ProcessQuery(queryType string, queryArgs *commonpb.Payload) (*commonpb.Payload, error) + ProcessQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) StackTrace() string // Close for cleaning up resources on this event handler Close() @@ -108,7 +108,7 @@ type ( eventHandler atomic.Value isWorkflowCompleted bool - result *commonpb.Payload + result *commonpb.Payloads err error previousStartedEventID int64 @@ -496,7 +496,7 @@ func (w *workflowExecutionContextImpl) getEventHandler() *workflowExecutionEvent return eventHandlerImpl } -func (w *workflowExecutionContextImpl) completeWorkflow(result *commonpb.Payload, err error) { +func (w *workflowExecutionContextImpl) completeWorkflow(result *commonpb.Payloads, err error) { w.isWorkflowCompleted = true w.result = result w.err = err @@ -1617,12 +1617,12 @@ type temporalInvoker struct { cancelHandler func() heartBeatTimeoutInSec int32 // The heart beat interval configured for this activity. hbBatchEndTimer *time.Timer // Whether we started a batch of operations that need to be reported in the cycle. This gets started on a user call. - lastDetailsToReport **commonpb.Payload + lastDetailsToReport **commonpb.Payloads closeCh chan struct{} workerStopChannel <-chan struct{} } -func (i *temporalInvoker) Heartbeat(details *commonpb.Payload) error { +func (i *temporalInvoker) Heartbeat(details *commonpb.Payloads) error { i.Lock() defer i.Unlock() @@ -1663,7 +1663,7 @@ func (i *temporalInvoker) Heartbeat(details *commonpb.Payload) error { } // We close the batch and report the progress. - var detailsToReport **commonpb.Payload + var detailsToReport **commonpb.Payloads i.Lock() detailsToReport = i.lastDetailsToReport @@ -1680,7 +1680,7 @@ func (i *temporalInvoker) Heartbeat(details *commonpb.Payload) error { return err } -func (i *temporalInvoker) internalHeartBeat(details *commonpb.Payload) (bool, error) { +func (i *temporalInvoker) internalHeartBeat(details *commonpb.Payloads) (bool, error) { isActivityCancelled := false timeout := time.Duration(i.heartBeatTimeoutInSec) * time.Second if timeout <= 0 { @@ -1855,7 +1855,7 @@ func recordActivityHeartbeat( service workflowservice.WorkflowServiceClient, identity string, taskToken []byte, - details *commonpb.Payload, + details *commonpb.Payloads, ) error { request := &workflowservice.RecordActivityTaskHeartbeatRequest{ TaskToken: taskToken, @@ -1885,7 +1885,7 @@ func recordActivityHeartbeatByID( service workflowservice.WorkflowServiceClient, identity string, namespace, workflowID, runID, activityID string, - details *commonpb.Payload, + details *commonpb.Payloads, ) error { request := &workflowservice.RecordActivityTaskHeartbeatByIdRequest{ Namespace: namespace, diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 3c40743ec..de694afd3 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -202,13 +202,13 @@ func createTestEventWorkflowExecutionSignaled(eventID int64, signalName string) return createTestEventWorkflowExecutionSignaledWithPayload(eventID, signalName, nil) } -func createTestEventWorkflowExecutionSignaledWithPayload(eventID int64, signalName string, payload *commonpb.Payload) *eventpb.HistoryEvent { +func createTestEventWorkflowExecutionSignaledWithPayload(eventID int64, signalName string, payloads *commonpb.Payloads) *eventpb.HistoryEvent { return &eventpb.HistoryEvent{ EventId: eventID, EventType: eventpb.EventType_WorkflowExecutionSignaled, Attributes: &eventpb.HistoryEvent_WorkflowExecutionSignaledEventAttributes{WorkflowExecutionSignaledEventAttributes: &eventpb.WorkflowExecutionSignaledEventAttributes{ SignalName: signalName, - Input: payload, + Input: payloads, Identity: "test-identity", }}, } @@ -1262,7 +1262,7 @@ type testActivityDeadline struct { d time.Duration } -func (t *testActivityDeadline) Execute(ctx context.Context, _ *commonpb.Payload) (*commonpb.Payload, error) { +func (t *testActivityDeadline) Execute(ctx context.Context, _ *commonpb.Payloads) (*commonpb.Payloads, error) { if d, _ := ctx.Deadline(); d.IsZero() { panic("invalid deadline provided") } @@ -1504,7 +1504,7 @@ func Test_IsDecisionMatchEvent_UpsertWorkflowSearchAttributes(t *testing.T) { } func Test_IsSearchAttributesMatched(t *testing.T) { - encodeString := func(str string) *commonpb.Payload { + encodeString := func(str string) *commonpb.Payloads { payload, _ := DefaultDataConverter.ToData(str) return payload } @@ -1536,7 +1536,7 @@ func Test_IsSearchAttributesMatched(t *testing.T) { { name: "not match", lhs: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payload{ + IndexedFields: map[string]*commonpb.Payloads{ "key1": encodeString("1"), "key2": encodeString("abc"), }, @@ -1547,13 +1547,13 @@ func Test_IsSearchAttributesMatched(t *testing.T) { { name: "match", lhs: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payload{ + IndexedFields: map[string]*commonpb.Payloads{ "key1": encodeString("1"), "key2": encodeString("abc"), }, }, rhs: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payload{ + IndexedFields: map[string]*commonpb.Payloads{ "key2": encodeString("abc"), "key1": encodeString("1"), }, diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 4699ddf71..310ac1d05 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -136,7 +136,7 @@ type ( } localActivityResult struct { - result *commonpb.Payload + result *commonpb.Payloads err error task *localActivityTask backoff time.Duration @@ -530,7 +530,7 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi task.cancelFunc = cancel task.Unlock() - var laResult *commonpb.Payload + var laResult *commonpb.Payloads var err error doneCh := make(chan struct{}) go func(ch chan struct{}) { @@ -1008,7 +1008,7 @@ func reportActivityCompleteByID(ctx context.Context, service workflowservice.Wor return reportErr } -func convertActivityResultToRespondRequest(identity string, taskToken []byte, result *commonpb.Payload, err error, +func convertActivityResultToRespondRequest(identity string, taskToken []byte, result *commonpb.Payloads, err error, dataConverter DataConverter) interface{} { if err == ErrActivityResultPending { // activity result is pending and will be completed asynchronously. @@ -1039,7 +1039,7 @@ func convertActivityResultToRespondRequest(identity string, taskToken []byte, re } func convertActivityResultToRespondRequestByID(identity, namespace, workflowID, runID, activityID string, - result *commonpb.Payload, err error, dataConverter DataConverter) interface{} { + result *commonpb.Payloads, err error, dataConverter DataConverter) interface{} { if err == ErrActivityResultPending { // activity result is pending and will be completed asynchronously. // nothing to report at this point diff --git a/internal/internal_utils.go b/internal/internal_utils.go index 536d1de11..3138caf5e 100644 --- a/internal/internal_utils.go +++ b/internal/internal_utils.go @@ -147,10 +147,10 @@ func getWorkerTaskList(stickyUUID string) string { } // getErrorDetails gets reason and details. -func getErrorDetails(err error, dataConverter DataConverter) (string, *commonpb.Payload) { +func getErrorDetails(err error, dataConverter DataConverter) (string, *commonpb.Payloads) { switch err := err.(type) { case *CustomError: - var data *commonpb.Payload + var data *commonpb.Payloads var err0 error switch details := err.details.(type) { case ErrorDetailsValues: @@ -165,7 +165,7 @@ func getErrorDetails(err error, dataConverter DataConverter) (string, *commonpb. } return err.Reason(), data case *CanceledError: - var data *commonpb.Payload + var data *commonpb.Payloads var err0 error switch details := err.details.(type) { case ErrorDetailsValues: @@ -186,7 +186,7 @@ func getErrorDetails(err error, dataConverter DataConverter) (string, *commonpb. } return errReasonPanic, data case *TimeoutError: - var data *commonpb.Payload + var data *commonpb.Payloads var err0 error switch details := err.details.(type) { case ErrorDetailsValues: @@ -211,7 +211,7 @@ func getErrorDetails(err error, dataConverter DataConverter) (string, *commonpb. } // constructError construct error from reason and details sending down from server. -func constructError(reason string, details *commonpb.Payload, dataConverter DataConverter) error { +func constructError(reason string, details *commonpb.Payloads, dataConverter DataConverter) error { if strings.HasPrefix(reason, errReasonTimeout) { details := newEncodedValues(details, dataConverter) timeoutType, err := getTimeoutTypeFromErrReason(reason) diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 46c72faa3..dcd8a486c 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -744,12 +744,12 @@ func validateFnFormat(fnType reflect.Type, isWorkflow bool) error { } // encode multiple arguments(arguments to a function). -func encodeArgs(dc DataConverter, args []interface{}) (*commonpb.Payload, error) { +func encodeArgs(dc DataConverter, args []interface{}) (*commonpb.Payloads, error) { return dc.ToData(args...) } // decode multiple arguments(arguments to a function). -func decodeArgs(dc DataConverter, fnType reflect.Type, data *commonpb.Payload) (result []reflect.Value, err error) { +func decodeArgs(dc DataConverter, fnType reflect.Type, data *commonpb.Payloads) (result []reflect.Value, err error) { r, err := decodeArgsToValues(dc, fnType, data) if err != nil { return @@ -760,7 +760,7 @@ func decodeArgs(dc DataConverter, fnType reflect.Type, data *commonpb.Payload) ( return } -func decodeArgsToValues(dc DataConverter, fnType reflect.Type, data *commonpb.Payload) (result []interface{}, err error) { +func decodeArgsToValues(dc DataConverter, fnType reflect.Type, data *commonpb.Payloads) (result []interface{}, err error) { argsLoop: for i := 0; i < fnType.NumIn(); i++ { argT := fnType.In(i) @@ -778,12 +778,12 @@ argsLoop: } // encode single value(like return parameter). -func encodeArg(dc DataConverter, arg interface{}) (*commonpb.Payload, error) { +func encodeArg(dc DataConverter, arg interface{}) (*commonpb.Payloads, error) { return dc.ToData(arg) } // decode single value(like return parameter). -func decodeArg(dc DataConverter, data *commonpb.Payload, to interface{}) error { +func decodeArg(dc DataConverter, data *commonpb.Payloads, to interface{}) error { return dc.FromData(data, to) } @@ -794,7 +794,7 @@ func decodeAndAssignValue(dc DataConverter, from interface{}, toValuePtr interfa if rf := reflect.ValueOf(toValuePtr); rf.Type().Kind() != reflect.Ptr { return errors.New("value parameter provided is not a pointer") } - if data, ok := from.(*commonpb.Payload); ok { + if data, ok := from.(*commonpb.Payloads); ok { if err := decodeArg(dc, data, toValuePtr); err != nil { return err } @@ -826,7 +826,7 @@ type workflowExecutor struct { interceptors []WorkflowInterceptorFactory } -func (we *workflowExecutor) Execute(ctx Context, input *commonpb.Payload) (*commonpb.Payload, error) { +func (we *workflowExecutor) Execute(ctx Context, input *commonpb.Payloads) (*commonpb.Payloads, error) { var args []interface{} dataConverter := getWorkflowEnvOptions(ctx).dataConverter fnType := reflect.TypeOf(we.fn) @@ -859,7 +859,7 @@ func (ae *activityExecutor) GetFunction() interface{} { return ae.fn } -func (ae *activityExecutor) Execute(ctx context.Context, input *commonpb.Payload) (*commonpb.Payload, error) { +func (ae *activityExecutor) Execute(ctx context.Context, input *commonpb.Payloads) (*commonpb.Payloads, error) { fnType := reflect.TypeOf(ae.fn) var args []reflect.Value dataConverter := getDataConverterFromActivityCtx(ctx) @@ -882,7 +882,7 @@ func (ae *activityExecutor) Execute(ctx context.Context, input *commonpb.Payload return validateFunctionAndGetResults(ae.fn, retValues, dataConverter) } -func (ae *activityExecutor) ExecuteWithActualArgs(ctx context.Context, actualArgs []interface{}) (*commonpb.Payload, error) { +func (ae *activityExecutor) ExecuteWithActualArgs(ctx context.Context, actualArgs []interface{}) (*commonpb.Payloads, error) { retValues := ae.executeWithActualArgsWithoutParseResult(ctx, actualArgs) dataConverter := getDataConverterFromActivityCtx(ctx) diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index c81d653fc..d05edffb2 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -57,12 +57,12 @@ var errShutdown = errors.New("worker shutting down") type ( // resultHandler that returns result - resultHandler func(result *commonpb.Payload, err error) + resultHandler func(result *commonpb.Payloads, err error) laResultHandler func(lar *localActivityResultWrapper) localActivityResultWrapper struct { err error - result *commonpb.Payload + result *commonpb.Payloads attempt int32 backoff time.Duration } @@ -73,19 +73,19 @@ type ( asyncActivityClient localActivityClient workflowTimerClient - SideEffect(f func() (*commonpb.Payload, error), callback resultHandler) + SideEffect(f func() (*commonpb.Payloads, error), callback resultHandler) GetVersion(changeID string, minSupported, maxSupported Version) Version WorkflowInfo() *WorkflowInfo - Complete(result *commonpb.Payload, err error) + Complete(result *commonpb.Payloads, err error) RegisterCancelHandler(handler func()) RequestCancelChildWorkflow(namespace, workflowID string) RequestCancelExternalWorkflow(namespace, workflowID, runID string, callback resultHandler) ExecuteChildWorkflow(params executeWorkflowParams, callback resultHandler, startedHandler func(r WorkflowExecution, e error)) error GetLogger() *zap.Logger GetMetricsScope() tally.Scope - RegisterSignalHandler(handler func(name string, input *commonpb.Payload)) - SignalExternalWorkflow(namespace, workflowID, runID, signalName string, input *commonpb.Payload, arg interface{}, childWorkflowOnly bool, callback resultHandler) - RegisterQueryHandler(handler func(queryType string, queryArgs *commonpb.Payload) (*commonpb.Payload, error)) + RegisterSignalHandler(handler func(name string, input *commonpb.Payloads)) + SignalExternalWorkflow(namespace, workflowID, runID, signalName string, input *commonpb.Payloads, arg interface{}, childWorkflowOnly bool, callback resultHandler) + RegisterQueryHandler(handler func(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error)) IsReplaying() bool MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool) Value GetDataConverter() DataConverter @@ -98,7 +98,7 @@ type ( // WorkflowDefinition wraps the code that can execute a workflow. workflowDefinition interface { - Execute(env workflowEnvironment, header *commonpb.Header, input *commonpb.Payload) + Execute(env workflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) // Called for each non timed out startDecision event. // Executed after all history events since the previous decision are applied to workflowDefinition OnDecisionTaskStarted() diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index 32fe35eb4..78fb5f96c 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -149,7 +149,7 @@ func (ga greeterActivity) ActivityType() ActivityType { return ActivityType{Name: activityName} } -func (ga greeterActivity) Execute(context.Context, *commonpb.Payload) (*commonpb.Payload, error) { +func (ga greeterActivity) Execute(context.Context, *commonpb.Payloads) (*commonpb.Payloads, error) { return DefaultDataConverter.ToData([]byte("World")) } diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 8d8645114..6107b086c 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -142,7 +142,7 @@ func (s *internalWorkerTestSuite) TearDownTest() { s.mockCtrl.Finish() // assert mock’s expectations } -func (s *internalWorkerTestSuite) createLocalActivityMarkerDataForTest(activityID string) *commonpb.Payload { +func (s *internalWorkerTestSuite) createLocalActivityMarkerDataForTest(activityID string) *commonpb.Payloads { lamd := localActivityMarkerData{ ActivityID: activityID, ReplayTime: time.Now(), @@ -1331,7 +1331,7 @@ func _TestThriftEncoding(t *testing.T) { */ // Encode function args -func testEncodeFunctionArgs(dataConverter DataConverter, args ...interface{}) *commonpb.Payload { +func testEncodeFunctionArgs(dataConverter DataConverter, args ...interface{}) *commonpb.Payloads { input, err := encodeArgs(dataConverter, args) if err != nil { fmt.Println(err) diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 4986927cb..b0ffbd0a9 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -488,7 +488,7 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() { s.Equal(2, localActivityCalledCount) } -func (s *WorkersTestSuite) createLocalActivityMarkerDataForTest(activityID string) *commonpb.Payload { +func (s *WorkersTestSuite) createLocalActivityMarkerDataForTest(activityID string) *commonpb.Payloads { lamd := localActivityMarkerData{ ActivityID: activityID, ReplayTime: time.Now(), diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index fa41fd0f2..05d2e5464 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -59,7 +59,7 @@ type ( } workflowResult struct { - workflowResult *commonpb.Payload + workflowResult *commonpb.Payloads error error } @@ -96,7 +96,7 @@ type ( // All time manipulation should use current time returned by GetTime(ctx) method. // Note that workflow.Context is used instead of context.Context to avoid use of raw channels. workflow interface { - Execute(ctx Context, input *commonpb.Payload) (result *commonpb.Payload, err error) + Execute(ctx Context, input *commonpb.Payloads) (result *commonpb.Payloads, err error) } sendCallback struct { @@ -176,7 +176,7 @@ type ( workflowID string waitForCancellation bool signalChannels map[string]Channel - queryHandlers map[string]func(*commonpb.Payload) (*commonpb.Payload, error) + queryHandlers map[string]func(*commonpb.Payloads) (*commonpb.Payloads, error) workflowIDReusePolicy WorkflowIDReusePolicy dataConverter DataConverter retryPolicy *commonpb.RetryPolicy @@ -190,11 +190,11 @@ type ( executeWorkflowParams struct { workflowOptions workflowType *WorkflowType - input *commonpb.Payload + input *commonpb.Payloads header *commonpb.Header - attempt int32 // used by test framework to support child workflow retry - scheduledTime time.Time // used by test framework to support child workflow retry - lastCompletionResult *commonpb.Payload // used by test framework to support cron + attempt int32 // used by test framework to support child workflow retry + scheduledTime time.Time // used by test framework to support child workflow retry + lastCompletionResult *commonpb.Payloads // used by test framework to support cron } // decodeFutureImpl @@ -308,8 +308,8 @@ func (f *futureImpl) Get(ctx Context, value interface{}) error { return errors.New("value parameter is not a pointer") } - if payload, ok := f.value.(*commonpb.Payload); ok { - if _, ok2 := value.(**commonpb.Payload); !ok2 { + if payload, ok := f.value.(*commonpb.Payloads); ok { + if _, ok2 := value.(**commonpb.Payloads); !ok2 { if err := decodeArg(getDataConverterFromWorkflowContext(ctx), payload, value); err != nil { return err } @@ -452,7 +452,7 @@ func newWorkflowInterceptors(env workflowEnvironment, factories []WorkflowInterc return interceptor, envInterceptor } -func (d *syncWorkflowDefinition) Execute(env workflowEnvironment, header *commonpb.Header, input *commonpb.Payload) { +func (d *syncWorkflowDefinition) Execute(env workflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) { interceptors, envInterceptor := newWorkflowInterceptors(env, env.GetRegistry().getInterceptors()) dispatcher, rootCtx := newDispatcher(newWorkflowContext(env, interceptors, envInterceptor), func(ctx Context) { r := &workflowResult{} @@ -486,7 +486,7 @@ func (d *syncWorkflowDefinition) Execute(env workflowEnvironment, header *common d.cancel() }) - getWorkflowEnvironment(d.rootCtx).RegisterSignalHandler(func(name string, result *commonpb.Payload) { + getWorkflowEnvironment(d.rootCtx).RegisterSignalHandler(func(name string, result *commonpb.Payloads) { eo := getWorkflowEnvOptions(d.rootCtx) // We don't want this code to be blocked ever, using sendAsync(). ch := eo.getSignalChannel(d.rootCtx, name).(*channelImpl) @@ -496,7 +496,7 @@ func (d *syncWorkflowDefinition) Execute(env workflowEnvironment, header *common } }) - getWorkflowEnvironment(d.rootCtx).RegisterQueryHandler(func(queryType string, queryArgs *commonpb.Payload) (*commonpb.Payload, error) { + getWorkflowEnvironment(d.rootCtx).RegisterQueryHandler(func(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) { eo := getWorkflowEnvOptions(d.rootCtx) handler, ok := eo.queryHandlers[queryType] if !ok { @@ -1124,7 +1124,7 @@ func newSyncWorkflowDefinition(workflow workflow) *syncWorkflowDefinition { return &syncWorkflowDefinition{workflow: workflow} } -func getValidatedWorkflowFunction(workflowFunc interface{}, args []interface{}, dataConverter DataConverter, r *registry) (*WorkflowType, *commonpb.Payload, error) { +func getValidatedWorkflowFunction(workflowFunc interface{}, args []interface{}, dataConverter DataConverter, r *registry) (*WorkflowType, *commonpb.Payloads, error) { fnName := "" fType := reflect.TypeOf(workflowFunc) switch getKind(fType) { @@ -1168,7 +1168,7 @@ func setWorkflowEnvOptionsIfNotExist(ctx Context) Context { newOptions = *options } else { newOptions.signalChannels = make(map[string]Channel) - newOptions.queryHandlers = make(map[string]func(*commonpb.Payload) (*commonpb.Payload, error)) + newOptions.queryHandlers = make(map[string]func(*commonpb.Payloads) (*commonpb.Payloads, error)) } if newOptions.dataConverter == nil { newOptions.dataConverter = getDefaultDataConverter() @@ -1196,7 +1196,7 @@ func getContextPropagatorsFromWorkflowContext(ctx Context) []ContextPropagator { func getHeadersFromContext(ctx Context) *commonpb.Header { header := &commonpb.Header{ - Fields: make(map[string]*commonpb.Payload), + Fields: make(map[string]*commonpb.Payloads), } contextPropagators := getContextPropagatorsFromWorkflowContext(ctx) for _, ctxProp := range contextPropagators { @@ -1245,7 +1245,7 @@ func (d *decodeFutureImpl) Get(ctx Context, value interface{}) error { return errors.New("value parameter is not a pointer") } dataConverter := getDataConverterFromWorkflowContext(ctx) - err := dataConverter.FromData(d.futureImpl.value.(*commonpb.Payload), value) + err := dataConverter.FromData(d.futureImpl.value.(*commonpb.Payloads), value) if err != nil { return err } @@ -1297,7 +1297,7 @@ func (h *queryHandler) validateHandlerFn() error { return nil } -func (h *queryHandler) execute(input *commonpb.Payload) (result *commonpb.Payload, err error) { +func (h *queryHandler) execute(input *commonpb.Payloads) (result *commonpb.Payloads, err error) { // if query handler panic, convert it to error defer func() { if p := recover(); p != nil { diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index d6d902208..30083f7c3 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -553,7 +553,7 @@ func (wc *WorkflowClient) CompleteActivity(ctx context.Context, taskToken []byte return errors.New("invalid task token provided") } - var data *commonpb.Payload + var data *commonpb.Payloads if result != nil { var err0 error data, err0 = encodeArg(wc.dataConverter, result) @@ -574,7 +574,7 @@ func (wc *WorkflowClient) CompleteActivityByID(ctx context.Context, namespace, w return errors.New("empty activity or workflow id or namespace") } - var data *commonpb.Payload + var data *commonpb.Payloads if result != nil { var err0 error data, err0 = encodeArg(wc.dataConverter, result) @@ -864,7 +864,7 @@ type QueryWorkflowWithOptionsResponse struct { // - EntityNotExistError // - QueryFailError func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request *QueryWorkflowWithOptionsRequest) (*QueryWorkflowWithOptionsResponse, error) { - var input *commonpb.Payload + var input *commonpb.Payloads if len(request.Args) > 0 { var err error if input, err = encodeArgs(wc.dataConverter, request.Args); err != nil { @@ -952,7 +952,7 @@ func (wc *WorkflowClient) CloseConnection() error { func (wc *WorkflowClient) getWorkflowHeader(ctx context.Context) *commonpb.Header { header := &commonpb.Header{ - Fields: make(map[string]*commonpb.Payload), + Fields: make(map[string]*commonpb.Payloads), } writer := NewHeaderWriter(header) for _, ctxProp := range wc.contextPropagators { @@ -1133,7 +1133,7 @@ func getWorkflowMemo(input map[string]interface{}, dc DataConverter) (*commonpb. return nil, nil } - memo := make(map[string]*commonpb.Payload) + memo := make(map[string]*commonpb.Payloads) for k, v := range input { memoBytes, err := encodeArg(dc, v) if err != nil { @@ -1149,7 +1149,7 @@ func serializeSearchAttributes(input map[string]interface{}) (*commonpb.SearchAt return nil, nil } - attr := make(map[string]*commonpb.Payload) + attr := make(map[string]*commonpb.Payloads) for k, v := range input { attrBytes, err := getDefaultDataConverter().ToData(v) if err != nil { diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 8cb09615e..ae774178e 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -119,7 +119,7 @@ func (s *stringMapPropagator) InjectFromWorkflow(ctx Context, writer HeaderWrite // Extract extracts values from headers and puts them into context func (s *stringMapPropagator) Extract(ctx context.Context, reader HeaderReader) (context.Context, error) { - if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error { + if err := reader.ForEachKey(func(key string, value *commonpb.Payloads) error { if _, ok := s.keys[key]; ok { var decodedValue string err := DefaultDataConverter.FromData(value, &decodedValue) @@ -137,7 +137,7 @@ func (s *stringMapPropagator) Extract(ctx context.Context, reader HeaderReader) // ExtractToWorkflow extracts values from headers and puts them into context func (s *stringMapPropagator) ExtractToWorkflow(ctx Context, reader HeaderReader) (Context, error) { - if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error { + if err := reader.ForEachKey(func(key string, value *commonpb.Payloads) error { if _, ok := s.keys[key]; ok { var decodedValue string err := DefaultDataConverter.FromData(value, &decodedValue) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 5cb2b9508..b1b7d70ec 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -28,13 +28,14 @@ import ( "context" "errors" "fmt" - decisionpb "go.temporal.io/temporal-proto/decision" - tasklistpb "go.temporal.io/temporal-proto/tasklist" "reflect" "strings" "sync" "time" + decisionpb "go.temporal.io/temporal-proto/decision" + tasklistpb "go.temporal.io/temporal-proto/tasklist" + "github.com/facebookgo/clock" "github.com/golang/mock/gomock" "github.com/opentracing/opentracing-go" @@ -84,7 +85,7 @@ type ( testActivityHandle struct { callback resultHandler activityType string - heartbeatDetails *commonpb.Payload + heartbeatDetails *commonpb.Payloads } testWorkflowHandle struct { @@ -184,8 +185,8 @@ type ( openSessions map[string]*SessionInfo workflowCancelHandler func() - signalHandler func(name string, input *commonpb.Payload) - queryHandler func(string, *commonpb.Payload) (*commonpb.Payload, error) + signalHandler func(name string, input *commonpb.Payloads) + queryHandler func(string, *commonpb.Payloads) (*commonpb.Payloads, error) startedHandler func(r WorkflowExecution, e error) isTestCompleted bool @@ -196,7 +197,7 @@ type ( dataConverter DataConverter runTimeout time.Duration - heartbeatDetails *commonpb.Payload + heartbeatDetails *commonpb.Payloads workerStopChannel chan struct{} sessionEnvironment *testSessionEnvironmentImpl @@ -262,7 +263,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist env.setStartTime(time.Now()) // put current workflow as a running workflow so child can send signal to parent - env.runningWorkflows[env.workflowInfo.WorkflowExecution.ID] = &testWorkflowHandle{env: env, callback: func(result *commonpb.Payload, err error) {}} + env.runningWorkflows[env.workflowInfo.WorkflowExecution.ID] = &testWorkflowHandle{env: env, callback: func(result *commonpb.Payloads, err error) {}} if env.logger == nil { logger, _ := zap.NewDevelopment() @@ -431,7 +432,7 @@ func (env *testWorkflowEnvironmentImpl) executeWorkflow(workflowFn interface{}, env.executeWorkflowInternal(0, workflowType.Name, input) } -func (env *testWorkflowEnvironmentImpl) executeWorkflowInternal(delayStart time.Duration, workflowType string, input *commonpb.Payload) { +func (env *testWorkflowEnvironmentImpl) executeWorkflowInternal(delayStart time.Duration, workflowType string, input *commonpb.Payloads) { env.locker.Lock() wInfo := env.workflowInfo if wInfo.WorkflowType.Name != workflowTypeNotSpecified { @@ -660,7 +661,7 @@ func (env *testWorkflowEnvironmentImpl) startMainLoop() { } func (env *testWorkflowEnvironmentImpl) registerDelayedCallback(f func(), delayDuration time.Duration) { - timerCallback := func(result *commonpb.Payload, err error) { + timerCallback := func(result *commonpb.Payloads, err error) { f() } if delayDuration == 0 { @@ -793,7 +794,7 @@ func (env *testWorkflowEnvironmentImpl) RequestCancelTimer(timerID string) { }, true) } -func (env *testWorkflowEnvironmentImpl) Complete(result *commonpb.Payload, err error) { +func (env *testWorkflowEnvironmentImpl) Complete(result *commonpb.Payloads, err error) { if env.isTestCompleted { env.logger.Debug("Workflow already completed.") return @@ -855,8 +856,8 @@ func (h *testWorkflowHandle) rerunAsChild() bool { params := h.params // pass down the last completion result - var result *commonpb.Payload - // TODO: convert env.testResult to *commonpb.Payload + var result *commonpb.Payloads + // TODO: convert env.testResult to *commonpb.Payloads if ev, ok := env.testResult.(*EncodedValue); ev != nil && ok { result = ev.value } @@ -908,7 +909,7 @@ func (env *testWorkflowEnvironmentImpl) CompleteActivity(taskToken []byte, resul if taskToken == nil { return errors.New("nil task token provided") } - var data *commonpb.Payload + var data *commonpb.Payloads if result != nil { var encodeErr error data, encodeErr = encodeArg(env.GetDataConverter(), result) @@ -1270,7 +1271,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteLocalActivity(params executeLocal aew := &activityExecutorWrapper{activityExecutor: ae, env: env} // substitute the local activity function so we could replace with mock if it is supplied. - params.ActivityFn = func(ctx context.Context, inputArgs ...interface{}) (*commonpb.Payload, error) { + params.ActivityFn = func(ctx context.Context, inputArgs ...interface{}) (*commonpb.Payloads, error) { return aew.ExecuteWithActualArgs(ctx, params.InputArgs) } @@ -1332,7 +1333,7 @@ func (env *testWorkflowEnvironmentImpl) handleActivityResult(activityID string, delete(env.activities, activityID) - var blob *commonpb.Payload + var blob *commonpb.Payloads var err error switch request := result.(type) { @@ -1436,7 +1437,7 @@ func (env *testWorkflowEnvironmentImpl) runBeforeMockCallReturns(call *MockCallW } // Execute executes the activity code. -func (a *activityExecutorWrapper) Execute(ctx context.Context, input *commonpb.Payload) (*commonpb.Payload, error) { +func (a *activityExecutorWrapper) Execute(ctx context.Context, input *commonpb.Payloads) (*commonpb.Payloads, error) { activityInfo := GetActivityInfo(ctx) dc := getDataConverterFromActivityCtx(ctx) if a.env.onActivityStartedListener != nil { @@ -1457,7 +1458,7 @@ func (a *activityExecutorWrapper) Execute(ctx context.Context, input *commonpb.P } // ExecuteWithActualArgs executes the activity code. -func (a *activityExecutorWrapper) ExecuteWithActualArgs(ctx context.Context, inputArgs []interface{}) (*commonpb.Payload, error) { +func (a *activityExecutorWrapper) ExecuteWithActualArgs(ctx context.Context, inputArgs []interface{}) (*commonpb.Payloads, error) { activityInfo := GetActivityInfo(ctx) if a.env.onLocalActivityStartedListener != nil { waitCh := make(chan struct{}) @@ -1477,7 +1478,7 @@ func (a *activityExecutorWrapper) ExecuteWithActualArgs(ctx context.Context, inp } // Execute executes the workflow code. -func (w *workflowExecutorWrapper) Execute(ctx Context, input *commonpb.Payload) (result *commonpb.Payload, err error) { +func (w *workflowExecutorWrapper) Execute(ctx Context, input *commonpb.Payloads) (result *commonpb.Payloads, err error) { env := w.env if env.isChildWorkflow() && env.onChildWorkflowStartedListener != nil { env.onChildWorkflowStartedListener(GetWorkflowInfo(ctx), ctx, newEncodedValues(input, w.env.GetDataConverter())) @@ -1553,7 +1554,7 @@ func (m *mockWrapper) getCtxArg(ctx interface{}) []interface{} { return nil } -func (m *mockWrapper) getMockReturn(ctx interface{}, input *commonpb.Payload) (retArgs mock.Arguments) { +func (m *mockWrapper) getMockReturn(ctx interface{}, input *commonpb.Payloads) (retArgs mock.Arguments) { if _, ok := m.env.expectedMockCalls[m.name]; !ok { // no mock return nil @@ -1608,7 +1609,7 @@ func (m *mockWrapper) getMockFn(mockRet mock.Arguments) interface{} { return nil } -func (m *mockWrapper) getMockValue(mockRet mock.Arguments) (*commonpb.Payload, error) { +func (m *mockWrapper) getMockValue(mockRet mock.Arguments) (*commonpb.Payloads, error) { fnName := m.name mockRetLen := len(mockRet) fnType := reflect.TypeOf(m.fn) @@ -1661,7 +1662,7 @@ func (m *mockWrapper) getMockValue(mockRet mock.Arguments) (*commonpb.Payload, e } } -func (m *mockWrapper) executeMock(ctx interface{}, input *commonpb.Payload, mockRet mock.Arguments) (result *commonpb.Payload, err error) { +func (m *mockWrapper) executeMock(ctx interface{}, input *commonpb.Payloads, mockRet mock.Arguments) (result *commonpb.Payloads, err error) { // have to handle panics here to support calling ExecuteChildWorkflow(...).GetChildWorkflowExecution().Get(...) // when a child is mocked. defer func() { @@ -1686,7 +1687,7 @@ func (m *mockWrapper) executeMock(ctx interface{}, input *commonpb.Payload, mock return m.getMockValue(mockRet) } -func (m *mockWrapper) executeMockWithActualArgs(ctx interface{}, inputArgs []interface{}, mockRet mock.Arguments) (*commonpb.Payload, error) { +func (m *mockWrapper) executeMockWithActualArgs(ctx interface{}, inputArgs []interface{}, mockRet mock.Arguments) (*commonpb.Payloads, error) { fnName := m.name // check if mock returns function which must match to the actual function. if mockFn := m.getMockFn(mockRet); mockFn != nil { @@ -1850,11 +1851,11 @@ func (env *testWorkflowEnvironmentImpl) RegisterCancelHandler(handler func()) { env.workflowCancelHandler = handler } -func (env *testWorkflowEnvironmentImpl) RegisterSignalHandler(handler func(name string, input *commonpb.Payload)) { +func (env *testWorkflowEnvironmentImpl) RegisterSignalHandler(handler func(name string, input *commonpb.Payloads)) { env.signalHandler = handler } -func (env *testWorkflowEnvironmentImpl) RegisterQueryHandler(handler func(string, *commonpb.Payload) (*commonpb.Payload, error)) { +func (env *testWorkflowEnvironmentImpl) RegisterQueryHandler(handler func(string, *commonpb.Payloads) (*commonpb.Payloads, error)) { env.queryHandler = handler } @@ -1862,7 +1863,7 @@ func (env *testWorkflowEnvironmentImpl) RequestCancelChildWorkflow(_, workflowID if childHandle, ok := env.runningWorkflows[workflowID]; ok && !childHandle.handled { // current workflow is a parent workflow, and we are canceling a child workflow childEnv := childHandle.env - childEnv.cancelWorkflow(func(result *commonpb.Payload, err error) {}) + childEnv.cancelWorkflow(func(result *commonpb.Payloads, err error) {}) return } } @@ -1919,7 +1920,7 @@ func (env *testWorkflowEnvironmentImpl) IsReplaying() bool { return false } -func (env *testWorkflowEnvironmentImpl) SignalExternalWorkflow(namespace, workflowID, runID, signalName string, input *commonpb.Payload, arg interface{}, childWorkflowOnly bool, callback resultHandler) { +func (env *testWorkflowEnvironmentImpl) SignalExternalWorkflow(namespace, workflowID, runID, signalName string, input *commonpb.Payloads, arg interface{}, childWorkflowOnly bool, callback resultHandler) { // check if target workflow is a known workflow if childHandle, ok := env.runningWorkflows[workflowID]; ok { // target workflow is a child @@ -1986,7 +1987,7 @@ func (env *testWorkflowEnvironmentImpl) executeChildWorkflowWithDelay(delayStart return nil } -func (env *testWorkflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payload, error), callback resultHandler) { +func (env *testWorkflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback resultHandler) { callback(f()) } @@ -2075,7 +2076,7 @@ func (env *testWorkflowEnvironmentImpl) RemoveSession(sessionID string) { delete(env.openSessions, sessionID) } -func (env *testWorkflowEnvironmentImpl) encodeValue(value interface{}) *commonpb.Payload { +func (env *testWorkflowEnvironmentImpl) encodeValue(value interface{}) *commonpb.Payloads { blob, err := env.GetDataConverter().ToData(value) if err != nil { panic(err) diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index c2cbbf644..644a8e43c 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -61,7 +61,7 @@ func (s *WorkflowTestSuiteUnitTest) SetupSuite() { ScheduleToCloseTimeout: 3 * time.Second, } s.header = &commonpb.Header{ - Fields: map[string]*commonpb.Payload{"test": encodeString(s.T(), "test-data")}, + Fields: map[string]*commonpb.Payloads{"test": encodeString(s.T(), "test-data")}, } s.contextPropagators = []ContextPropagator{NewStringMapPropagator([]string{"test"})} } @@ -387,7 +387,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithHeaderContext() { } s.SetHeader(&commonpb.Header{ - Fields: map[string]*commonpb.Payload{ + Fields: map[string]*commonpb.Payloads{ testHeader: encodeString(s.T(), "test-data"), }, }) @@ -1422,31 +1422,31 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithPointerTypes() { func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithProtoPayload() { var actualValues []string - activitySingleFn := func(ctx context.Context, wf1 commonpb.Payload, wf2 *commonpb.Payload) (commonpb.Payload, error) { - actualValues = append(actualValues, string(wf1.GetItems()[0].GetData())) - actualValues = append(actualValues, string(wf1.GetItems()[0].GetMetadata()[metadataEncoding])) - actualValues = append(actualValues, string(wf2.GetItems()[0].GetData())) + activitySingleFn := func(ctx context.Context, wf1 commonpb.Payloads, wf2 *commonpb.Payloads) (commonpb.Payloads, error) { + actualValues = append(actualValues, string(wf1.GetPayloads()[0].GetData())) + actualValues = append(actualValues, string(wf1.GetPayloads()[0].GetMetadata()[metadataEncoding])) + actualValues = append(actualValues, string(wf2.GetPayloads()[0].GetData())) - // If return type is *commonpb.Payload it will be automatically unwrpped (this is side effect of internal impementation). - // commonpb.Payload type is returned as is. - return commonpb.Payload{Items: []*commonpb.PayloadItem{{Data: []byte("result")}}}, nil + // If return type is *commonpb.Payloads it will be automatically unwrpped (this is side effect of internal impementation). + // commonpb.Payloads type is returned as is. + return commonpb.Payloads{Payloads: []*commonpb.Payload{{Data: []byte("result")}}}, nil } - input1 := commonpb.Payload{Items: []*commonpb.PayloadItem{{ // This will be JSON + input1 := commonpb.Payloads{Payloads: []*commonpb.Payload{{ // This will be JSON Metadata: map[string][]byte{ metadataEncoding: []byte("someencoding"), }, Data: []byte("input1")}}} - input2 := &commonpb.Payload{Items: []*commonpb.PayloadItem{{Data: []byte("input2")}}} + input2 := &commonpb.Payloads{Payloads: []*commonpb.Payload{{Data: []byte("input2")}}} env := s.NewTestActivityEnvironment() env.RegisterActivity(activitySingleFn) payload, err := env.ExecuteActivity(activitySingleFn, input1, input2) s.NoError(err) s.EqualValues([]string{"input1", "someencoding", "input2"}, actualValues) - var ret commonpb.Payload + var ret commonpb.Payloads _ = payload.Get(&ret) - s.Equal(commonpb.Payload{Items: []*commonpb.PayloadItem{{Data: []byte("result")}}}, ret) + s.Equal(commonpb.Payloads{Payloads: []*commonpb.Payload{{Data: []byte("result")}}}, ret) } func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithRandomProto() { @@ -1609,7 +1609,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowHeaderContext() { s.SetContextPropagators([]ContextPropagator{NewStringMapPropagator([]string{testHeader})}) s.SetHeader(&commonpb.Header{ - Fields: map[string]*commonpb.Payload{ + Fields: map[string]*commonpb.Payloads{ testHeader: encodeString(s.T(), "test-data"), }, }) diff --git a/internal/session.go b/internal/session.go index ea3a1ca7c..88f093853 100644 --- a/internal/session.go +++ b/internal/session.go @@ -418,7 +418,7 @@ func sessionCreationActivity(ctx context.Context, sessionID string) error { sessionEnv.CompleteSession(sessionID) return ctx.Err() case <-ticker.C: - err := activityEnv.serviceInvoker.Heartbeat(&commonpb.Payload{}) + err := activityEnv.serviceInvoker.Heartbeat(&commonpb.Payloads{}) if err != nil { sessionEnv.CompleteSession(sessionID) return err diff --git a/internal/tracer.go b/internal/tracer.go index 83c713be5..16ae5a1af 100644 --- a/internal/tracer.go +++ b/internal/tracer.go @@ -40,7 +40,7 @@ type tracingReader struct { var _ opentracing.TextMapReader = (*tracingReader)(nil) func (t tracingReader) ForeachKey(handler func(key, val string) error) error { - return t.reader.ForEachKey(func(k string, v *commonpb.Payload) error { + return t.reader.ForEachKey(func(k string, v *commonpb.Payloads) error { var decodedValue string err := DefaultDataConverter.FromData(v, &decodedValue) if err != nil { diff --git a/internal/tracer_test.go b/internal/tracer_test.go index ef79dbbbe..76ce0d2ef 100644 --- a/internal/tracer_test.go +++ b/internal/tracer_test.go @@ -47,7 +47,7 @@ func TestTracingContextPropagator(t *testing.T) { ctx := context.Background() ctx = opentracing.ContextWithSpan(ctx, span) header := &commonpb.Header{ - Fields: map[string]*commonpb.Payload{}, + Fields: map[string]*commonpb.Payloads{}, } err = ctxProp.Inject(ctx, NewHeaderWriter(header)) @@ -66,7 +66,7 @@ func TestTracingContextPropagatorNoSpan(t *testing.T) { ctxProp := NewTracingContextPropagator(zap.NewNop(), opentracing.NoopTracer{}) header := &commonpb.Header{ - Fields: map[string]*commonpb.Payload{}, + Fields: map[string]*commonpb.Payloads{}, } err := ctxProp.Inject(context.Background(), NewHeaderWriter(header)) require.NoError(t, err) @@ -87,7 +87,7 @@ func TestTracingContextPropagatorWorkflowContext(t *testing.T) { assert.NotNil(t, span.Context()) ctx := contextWithSpan(Background(), span.Context()) header := &commonpb.Header{ - Fields: map[string]*commonpb.Payload{}, + Fields: map[string]*commonpb.Payloads{}, } err = ctxProp.InjectFromWorkflow(ctx, NewHeaderWriter(header)) @@ -111,7 +111,7 @@ func TestTracingContextPropagatorWorkflowContextNoSpan(t *testing.T) { ctxProp := NewTracingContextPropagator(zap.NewNop(), opentracing.NoopTracer{}) header := &commonpb.Header{ - Fields: map[string]*commonpb.Payload{}, + Fields: map[string]*commonpb.Payloads{}, } err := ctxProp.InjectFromWorkflow(Background(), NewHeaderWriter(header)) require.NoError(t, err) diff --git a/internal/workflow.go b/internal/workflow.go index 60a52a536..562dd5932 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -160,7 +160,7 @@ type ( // EncodedValue is type alias used to encapsulate/extract encoded result from workflow/activity. EncodedValue struct { - value *commonpb.Payload + value *commonpb.Payloads dataConverter DataConverter } // Version represents a change version. See GetVersion call. @@ -451,7 +451,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteActivity(ctx Context, typeName ctxDone, cancellable := ctx.Done().(*channelImpl) cancellationCallback := &receiveCallback{} - a := getWorkflowEnvironment(ctx).ExecuteActivity(params, func(r *commonpb.Payload, e error) { + a := getWorkflowEnvironment(ctx).ExecuteActivity(params, func(r *commonpb.Payloads, e error) { settable.Set(r, e) if cancellable { // future is done, we don't need the cancellation callback anymore. @@ -543,7 +543,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteLocalActivity(ctx Context, acti Go(ctx, func(ctx Context) { for { f := wc.scheduleLocalActivity(ctx, params) - var result *commonpb.Payload + var result *commonpb.Payloads err := f.Get(ctx, &result) if retryErr, ok := err.(*needRetryError); ok && retryErr.Backoff > 0 { // Backoff for retry @@ -664,7 +664,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil ctxDone, cancellable := ctx.Done().(*channelImpl) cancellationCallback := &receiveCallback{} - err = getWorkflowEnvironment(ctx).ExecuteChildWorkflow(params, func(r *commonpb.Payload, e error) { + err = getWorkflowEnvironment(ctx).ExecuteChildWorkflow(params, func(r *commonpb.Payloads, e error) { mainSettable.Set(r, e) if cancellable { // future is done, we don't need cancellation anymore @@ -702,7 +702,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil func getWorkflowHeader(ctx Context, ctxProps []ContextPropagator) *commonpb.Header { header := &commonpb.Header{ - Fields: make(map[string]*commonpb.Payload), + Fields: make(map[string]*commonpb.Payloads), } writer := NewHeaderWriter(header) for _, ctxProp := range ctxProps { @@ -721,7 +721,7 @@ type WorkflowInfo struct { WorkflowTaskTimeoutSeconds int32 Namespace string Attempt int32 // Attempt starts from 0 and increased by 1 for every retry if retry policy is specified. - lastCompletionResult *commonpb.Payload + lastCompletionResult *commonpb.Payloads CronSchedule string ContinuedExecutionRunID string ParentWorkflowNamespace string @@ -792,7 +792,7 @@ func (wc *workflowEnvironmentInterceptor) NewTimer(ctx Context, d time.Duration) ctxDone, cancellable := ctx.Done().(*channelImpl) cancellationCallback := &receiveCallback{} - t := wc.env.NewTimer(d, func(r *commonpb.Payload, e error) { + t := wc.env.NewTimer(d, func(r *commonpb.Payloads, e error) { settable.Set(nil, e) if cancellable { // future is done, we don't need cancellation anymore @@ -857,7 +857,7 @@ func (wc *workflowEnvironmentInterceptor) RequestCancelExternalWorkflow(ctx Cont return future } - resultCallback := func(result *commonpb.Payload, err error) { + resultCallback := func(result *commonpb.Payloads, err error) { settable.Set(result, err) } @@ -906,7 +906,7 @@ func signalExternalWorkflow(ctx Context, workflowID, runID, signalName string, a return future } - resultCallback := func(result *commonpb.Payload, err error) { + resultCallback := func(result *commonpb.Payloads, err error) { settable.Set(result, err) } env.SignalExternalWorkflow( @@ -1057,7 +1057,7 @@ func (wc *workflowEnvironmentInterceptor) GetSignalChannel(ctx Context, signalNa return getWorkflowEnvOptions(ctx).getSignalChannel(ctx, signalName) } -func newEncodedValue(value *commonpb.Payload, dc DataConverter) Value { +func newEncodedValue(value *commonpb.Payloads, dc DataConverter) Value { if dc == nil { dc = getDefaultDataConverter() } @@ -1121,11 +1121,11 @@ func SideEffect(ctx Context, f func(ctx Context) interface{}) Value { func (wc *workflowEnvironmentInterceptor) SideEffect(ctx Context, f func(ctx Context) interface{}) Value { dc := getDataConverterFromWorkflowContext(ctx) future, settable := NewFuture(ctx) - wrapperFunc := func() (*commonpb.Payload, error) { + wrapperFunc := func() (*commonpb.Payloads, error) { r := f(ctx) return encodeArg(dc, r) } - resultCallback := func(result *commonpb.Payload, err error) { + resultCallback := func(result *commonpb.Payloads, err error) { settable.Set(EncodedValue{result, dc}, err) } wc.env.SideEffect(wrapperFunc, resultCallback) diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 3d32ee891..bcd068865 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -42,7 +42,7 @@ import ( type ( // EncodedValues is a type alias used to encapsulate/extract encoded arguments from workflow/activity. EncodedValues struct { - values *commonpb.Payload + values *commonpb.Payloads dataConverter DataConverter } @@ -78,7 +78,7 @@ type ( } ) -func newEncodedValues(values *commonpb.Payload, dc DataConverter) Values { +func newEncodedValues(values *commonpb.Payloads, dc DataConverter) Values { if dc == nil { dc = getDefaultDataConverter() } @@ -665,7 +665,7 @@ func (t *TestWorkflowEnvironment) CompleteActivity(taskToken []byte, result inte // CancelWorkflow requests cancellation (through workflow Context) to the currently running test workflow. func (t *TestWorkflowEnvironment) CancelWorkflow() { - t.impl.cancelWorkflow(func(result *commonpb.Payload, err error) {}) + t.impl.cancelWorkflow(func(result *commonpb.Payloads, err error) {}) } // SignalWorkflow sends signal to the currently running test workflow.