Skip to content

Commit

Permalink
Rename Payload to Payloads (#115)
Browse files Browse the repository at this point in the history
* Rename Payload to Payloads.

* Update go.temporal.io/temporal-proto.
  • Loading branch information
alexshtin authored Apr 30, 2020
1 parent 3284084 commit 6bcd477
Show file tree
Hide file tree
Showing 34 changed files with 240 additions and 241 deletions.
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ var _ internal.NamespaceClient = NamespaceClient(nil)
// which can be decoded by using:
// var result string // This need to be same type as the one passed to RecordHeartbeat
// NewValue(data).Get(&result)
func NewValue(data *commonpb.Payload) encoded.Value {
func NewValue(data *commonpb.Payloads) encoded.Value {
return internal.NewValue(data)
}

Expand All @@ -447,6 +447,6 @@ func NewValue(data *commonpb.Payload) encoded.Value {
// var result1 string
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
// NewValues(data).Get(&result1, &result2)
func NewValues(data *commonpb.Payload) encoded.Values {
func NewValues(data *commonpb.Payloads) encoded.Values {
return internal.NewValues(data)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
go.temporal.io/temporal-proto v0.20.29
go.temporal.io/temporal-proto v0.20.30
go.uber.org/atomic v1.6.0
go.uber.org/goleak v1.0.0
go.uber.org/zap v1.14.1
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,8 @@ github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMW
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.temporal.io/temporal-proto v0.20.28 h1:syCkMj1bBEqPCj4dKmPQksBKH3qXeZ+PayfsBMdQEOY=
go.temporal.io/temporal-proto v0.20.28/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s=
go.temporal.io/temporal-proto v0.20.29 h1:YDKcU0qxThs9ihny93Pf2/gSdGvybKjKuCIosy54nQ8=
go.temporal.io/temporal-proto v0.20.29/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s=
go.temporal.io/temporal-proto v0.20.30 h1:QxvCfTZ1U686bmlMPTTg0F/dvMvt02m7i3jF3zWEX/E=
go.temporal.io/temporal-proto v0.20.30/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
Expand Down
4 changes: 2 additions & 2 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
// no-op for local activity
return
}
var data *commonpb.Payload
var data *commonpb.Payloads
var err error
// We would like to be a able to pass in "nil" as part of details(that is no progress to report to)
if len(details) > 1 || (len(details) == 1 && details[0] != nil) {
Expand All @@ -226,7 +226,7 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
// Implement to unit test activities.
type ServiceInvoker interface {
// Returns ActivityTaskCanceledError if activity is cancelled
Heartbeat(details *commonpb.Payload) error
Heartbeat(details *commonpb.Payloads) error
Close(flushBufferedHeartbeat bool)
GetClient(namespace string, options ClientOptions) Client
}
Expand Down
4 changes: 2 additions & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func (p ParentClosePolicy) toProto() commonpb.ParentClosePolicy {
// which can be decoded by using:
// var result string // This need to be same type as the one passed to RecordHeartbeat
// NewValue(data).Get(&result)
func NewValue(data *commonpb.Payload) Value {
func NewValue(data *commonpb.Payloads) Value {
return newEncodedValue(data, nil)
}

Expand All @@ -695,6 +695,6 @@ func NewValue(data *commonpb.Payload) Value {
// var result1 string
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
// NewValues(data).Get(&result1, &result2)
func NewValues(data *commonpb.Payload) Values {
func NewValues(data *commonpb.Payloads) Values {
return newEncodedValues(data, nil)
}
30 changes: 15 additions & 15 deletions internal/encoded.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ type (
// 2. Activity/Workflow worker that run these activity/childWorkflow, through cleint.Options.
DataConverter interface {
// ToData implements conversion of a list of values.
ToData(value ...interface{}) (*commonpb.Payload, error)
ToData(value ...interface{}) (*commonpb.Payloads, error)
// FromData implements conversion of an array of values of different types.
// Useful for deserializing arguments of function invocations.
FromData(input *commonpb.Payload, valuePtr ...interface{}) error
FromData(input *commonpb.Payloads, valuePtr ...interface{}) error
}

// defaultDataConverter uses JSON.
Expand Down Expand Up @@ -107,22 +107,22 @@ func getDefaultDataConverter() DataConverter {
return DefaultDataConverter
}

func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payload, error) {
func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payloads, error) {
if len(values) == 0 {
return nil, nil
}

payload := &commonpb.Payload{}
result := &commonpb.Payloads{}
for i, value := range values {
nvp, ok := value.(NameValuePair)
if !ok {
nvp.Name = fmt.Sprintf("values[%d]", i)
nvp.Value = value
}

var payloadItem *commonpb.PayloadItem
var payload *commonpb.Payload
if bytes, isByteSlice := nvp.Value.([]byte); isByteSlice {
payloadItem = &commonpb.PayloadItem{
payload = &commonpb.Payload{
Metadata: map[string][]byte{
metadataEncoding: []byte(metadataEncodingRaw),
metadataName: []byte(nvp.Name),
Expand All @@ -134,31 +134,31 @@ func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payload
if err != nil {
return nil, fmt.Errorf("%s: %w: %v", nvp.Name, ErrUnableToEncodeJSON, err)
}
payloadItem = &commonpb.PayloadItem{
payload = &commonpb.Payload{
Metadata: map[string][]byte{
metadataEncoding: []byte(metadataEncodingJSON),
metadataName: []byte(nvp.Name),
},
Data: data,
}
}
payload.Items = append(payload.Items, payloadItem)
result.Payloads = append(result.Payloads, payload)
}

return payload, nil
return result, nil
}

func (dc *defaultDataConverter) FromData(payload *commonpb.Payload, valuePtrs ...interface{}) error {
if payload == nil {
func (dc *defaultDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
if payloads == nil {
return nil
}

for i, payloadItem := range payload.GetItems() {
for i, payload := range payloads.GetPayloads() {
if i >= len(valuePtrs) {
break
}

metadata := payloadItem.GetMetadata()
metadata := payload.GetMetadata()
if metadata == nil {
return fmt.Errorf("payload item %d: %w", i, ErrMetadataIsNotSet)
}
Expand All @@ -183,9 +183,9 @@ func (dc *defaultDataConverter) FromData(payload *commonpb.Payload, valuePtrs ..
if !valueBytes.CanSet() {
return fmt.Errorf("%s: %w", name, ErrUnableToSetBytes)
}
valueBytes.SetBytes(payloadItem.GetData())
valueBytes.SetBytes(payload.GetData())
case metadataEncodingJSON:
err := json.Unmarshal(payloadItem.GetData(), valuePtrs[i])
err := json.Unmarshal(payload.GetData(), valuePtrs[i])
if err != nil {
return fmt.Errorf("%s: %w: %v", name, ErrUnableToDecodeJSON, err)
}
Expand Down
18 changes: 9 additions & 9 deletions internal/encoded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func newTestDataConverter() DataConverter {
return &testDataConverter{}
}

func (dc *testDataConverter) ToData(values ...interface{}) (*commonpb.Payload, error) {
payload := &commonpb.Payload{}
func (dc *testDataConverter) ToData(values ...interface{}) (*commonpb.Payloads, error) {
result := &commonpb.Payloads{}

for i, arg := range values {
var buf bytes.Buffer
Expand All @@ -114,30 +114,30 @@ func (dc *testDataConverter) ToData(values ...interface{}) (*commonpb.Payload, e
return nil, fmt.Errorf("values[%d]: %w: %v", i, ErrUnableToEncodeGob, err)
}

payloadItem := &commonpb.PayloadItem{
payload := &commonpb.Payload{
Metadata: map[string][]byte{
metadataEncoding: []byte(metadataEncodingGob),
metadataName: []byte(fmt.Sprintf("args[%d]", i)),
},
Data: buf.Bytes(),
}
payload.Items = append(payload.Items, payloadItem)
result.Payloads = append(result.Payloads, payload)
}

return payload, nil
return result, nil
}

func (dc *testDataConverter) FromData(payload *commonpb.Payload, valuePtrs ...interface{}) error {
for i, payloadItem := range payload.GetItems() {
encoding, ok := payloadItem.GetMetadata()[metadataEncoding]
func (dc *testDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
for i, payload := range payloads.GetPayloads() {
encoding, ok := payload.GetMetadata()[metadataEncoding]

if !ok {
return fmt.Errorf("args[%d]: %w", i, ErrEncodingIsNotSet)
}

e := string(encoding)
if e == metadataEncodingGob {
dec := gob.NewDecoder(bytes.NewBuffer(payloadItem.GetData()))
dec := gob.NewDecoder(bytes.NewBuffer(payload.GetData()))
if err := dec.Decode(valuePtrs[i]); err != nil {
return fmt.Errorf("args[%d]: %w: %v", i, ErrUnableToDecodeGob, err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func testTimeoutErrorDetails(t *testing.T, timeoutType eventpb.TimeoutType) {
&decisionpb.ScheduleActivityTaskDecisionAttributes{ActivityId: activityID})
di.state = decisionStateInitiated
di.setData(&scheduledActivity{
callback: func(r *commonpb.Payload, e error) {
callback: func(r *commonpb.Payloads, e error) {
actualErr = e
},
})
Expand Down Expand Up @@ -414,7 +414,7 @@ func Test_SignalExternalWorkflowExecutionFailedError(t *testing.T) {
)
di.state = decisionStateInitiated
di.setData(&scheduledSignal{
callback: func(r *commonpb.Payload, e error) {
callback: func(r *commonpb.Payloads, e error) {
actualErr = e
},
})
Expand All @@ -441,7 +441,7 @@ func Test_ContinueAsNewError(t *testing.T) {
headerValue, err := DefaultDataConverter.ToData("test-data")
assert.NoError(t, err)
header := &commonpb.Header{
Fields: map[string]*commonpb.Payload{"test": headerValue},
Fields: map[string]*commonpb.Payloads{"test": headerValue},
}

s := &WorkflowTestSuite{
Expand Down
10 changes: 5 additions & 5 deletions internal/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import (

// HeaderWriter is an interface to write information to temporal headers
type HeaderWriter interface {
Set(string, *commonpb.Payload)
Set(string, *commonpb.Payloads)
}

// HeaderReader is an interface to read information from temporal headers
type HeaderReader interface {
ForEachKey(handler func(string, *commonpb.Payload) error) error
ForEachKey(handler func(string, *commonpb.Payloads) error) error
}

// ContextPropagator is an interface that determines what information from
Expand All @@ -62,7 +62,7 @@ type headerReader struct {
header *commonpb.Header
}

func (hr *headerReader) ForEachKey(handler func(string, *commonpb.Payload) error) error {
func (hr *headerReader) ForEachKey(handler func(string, *commonpb.Payloads) error) error {
if hr.header == nil {
return nil
}
Expand All @@ -83,7 +83,7 @@ type headerWriter struct {
header *commonpb.Header
}

func (hw *headerWriter) Set(key string, value *commonpb.Payload) {
func (hw *headerWriter) Set(key string, value *commonpb.Payloads) {
if hw.header == nil {
return
}
Expand All @@ -93,7 +93,7 @@ func (hw *headerWriter) Set(key string, value *commonpb.Payload) {
// NewHeaderWriter returns a header writer interface
func NewHeaderWriter(header *commonpb.Header) HeaderWriter {
if header != nil && header.Fields == nil {
header.Fields = make(map[string]*commonpb.Payload)
header.Fields = make(map[string]*commonpb.Payloads)
}
return &headerWriter{header}
}
28 changes: 14 additions & 14 deletions internal/headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,48 +37,48 @@ func TestHeaderWriter(t *testing.T) {
name string
initial *commonpb.Header
expected *commonpb.Header
vals map[string]*commonpb.Payload
vals map[string]*commonpb.Payloads
}{
{
"no values",
&commonpb.Header{
Fields: map[string]*commonpb.Payload{},
Fields: map[string]*commonpb.Payloads{},
},
&commonpb.Header{
Fields: map[string]*commonpb.Payload{},
Fields: map[string]*commonpb.Payloads{},
},
map[string]*commonpb.Payload{},
map[string]*commonpb.Payloads{},
},
{
"add values",
&commonpb.Header{
Fields: map[string]*commonpb.Payload{},
Fields: map[string]*commonpb.Payloads{},
},
&commonpb.Header{
Fields: map[string]*commonpb.Payload{
Fields: map[string]*commonpb.Payloads{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
map[string]*commonpb.Payload{
map[string]*commonpb.Payloads{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
{
"overwrite values",
&commonpb.Header{
Fields: map[string]*commonpb.Payload{
Fields: map[string]*commonpb.Payloads{
"key1": encodeString(t, "unexpected"),
},
},
&commonpb.Header{
Fields: map[string]*commonpb.Payload{
Fields: map[string]*commonpb.Payloads{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
map[string]*commonpb.Payload{
map[string]*commonpb.Payloads{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
Expand All @@ -98,7 +98,7 @@ func TestHeaderWriter(t *testing.T) {
}
}

func encodeString(t *testing.T, s string) *commonpb.Payload {
func encodeString(t *testing.T, s string) *commonpb.Payloads {
p, err := DefaultDataConverter.ToData(s)
assert.NoError(t, err)
return p
Expand All @@ -115,7 +115,7 @@ func TestHeaderReader(t *testing.T) {
{
"valid values",
&commonpb.Header{
Fields: map[string]*commonpb.Payload{
Fields: map[string]*commonpb.Payloads{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
Expand All @@ -126,7 +126,7 @@ func TestHeaderReader(t *testing.T) {
{
"invalid values",
&commonpb.Header{
Fields: map[string]*commonpb.Payload{
Fields: map[string]*commonpb.Payloads{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
Expand All @@ -141,7 +141,7 @@ func TestHeaderReader(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
reader := NewHeaderReader(test.header)
err := reader.ForEachKey(func(key string, _ *commonpb.Payload) error {
err := reader.ForEachKey(func(key string, _ *commonpb.Payloads) error {
if _, ok := test.keys[key]; !ok {
return assert.AnError
}
Expand Down
Loading

0 comments on commit 6bcd477

Please sign in to comment.