Skip to content

Commit

Permalink
[v2][adjuster] Rework adjuster interface and refactor adjusters to re…
Browse files Browse the repository at this point in the history
…turn implemented struct (#6362)

## Which problem is this PR solving?
- Towards #6344 

## Description of the changes
- This PR performs the following refactorings to the adjuster package
  - Remove the `Func` alias 
- Change the implemented adjusters to return a struct that implements
the Adjuster interface
- Change the interface to only return an error to indicate that traces
are modified in place
  - Move the warnings utility to `cmd/query/app/internal/jotlp`

## How was this change tested?
- CI and unit tests

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Dec 16, 2024
1 parent dad636b commit ef799d1
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 135 deletions.
27 changes: 9 additions & 18 deletions cmd/query/app/querysvc/adjuster/adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,12 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

// Adjuster defines an interface for modifying a trace object.
// It returns the adjusted trace object, which is also updated in place.
// If the adjuster encounters an issue that prevents it from applying
// modifications, it should return the original trace object along with an error.
// Adjuster is an interface for modifying a trace object in place.
// If an issue is encountered that prevents modifications, an error should be returned.
// The caller must ensure that all spans in the ptrace.Traces argument
// belong to the same trace and represent the complete trace.
type Adjuster interface {
Adjust(ptrace.Traces) (ptrace.Traces, error)
}

// Func is a type alias that wraps a function and makes an Adjuster from it.
type Func func(traces ptrace.Traces) (ptrace.Traces, error)

// Adjust implements Adjuster interface for the Func alias.
func (f Func) Adjust(traces ptrace.Traces) (ptrace.Traces, error) {
return f(traces)
Adjust(ptrace.Traces) error
}

// Sequence creates an adjuster that combines a series of adjusters
Expand All @@ -44,17 +36,16 @@ type sequence struct {
failFast bool
}

func (c sequence) Adjust(traces ptrace.Traces) (ptrace.Traces, error) {
func (c sequence) Adjust(traces ptrace.Traces) error {
var errs []error
for _, adjuster := range c.adjusters {
var err error
traces, err = adjuster.Adjust(traces)
err := adjuster.Adjust(traces)
if err != nil {
if c.failFast {
return traces, err
return err
}
errs = append(errs, err)
}
}
return traces, errors.Join(errs...)
return errors.Join(errs...)
}
43 changes: 22 additions & 21 deletions cmd/query/app/querysvc/adjuster/adjuster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package adjuster_test

import (
"errors"
"fmt"
"testing"

Expand All @@ -16,21 +15,23 @@ import (
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/adjuster"
)

func TestSequences(t *testing.T) {
// mock adjuster that increments last byte of span ID
adj := adjuster.Func(func(trace ptrace.Traces) (ptrace.Traces, error) {
span := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
spanId := span.SpanID()
spanId[7]++
span.SetSpanID(spanId)
return trace, nil
})
type mockAdjuster struct{}

func (mockAdjuster) Adjust(traces ptrace.Traces) error {
span := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
spanId := span.SpanID()
spanId[7]++
span.SetSpanID(spanId)
return nil
}

type mockAdjusterError struct{}

adjErr := errors.New("mock adjuster error")
failingAdj := adjuster.Func(func(trace ptrace.Traces) (ptrace.Traces, error) {
return trace, adjErr
})
func (mockAdjusterError) Adjust(ptrace.Traces) error {
return assert.AnError
}

func TestSequences(t *testing.T) {
tests := []struct {
name string
adjuster adjuster.Adjuster
Expand All @@ -39,14 +40,14 @@ func TestSequences(t *testing.T) {
}{
{
name: "normal sequence",
adjuster: adjuster.Sequence(adj, failingAdj, adj, failingAdj),
err: fmt.Sprintf("%s\n%s", adjErr, adjErr),
adjuster: adjuster.Sequence(mockAdjuster{}, mockAdjusterError{}, mockAdjuster{}, mockAdjusterError{}),
err: fmt.Sprintf("%s\n%s", assert.AnError, assert.AnError),
lastSpanID: [8]byte{0, 0, 0, 0, 0, 0, 0, 2},
},
{
name: "fail fast sequence",
adjuster: adjuster.FailFastSequence(adj, failingAdj, adj, failingAdj),
err: adjErr.Error(),
adjuster: adjuster.FailFastSequence(mockAdjuster{}, mockAdjusterError{}, mockAdjuster{}, mockAdjusterError{}),
err: assert.AnError.Error(),
lastSpanID: [8]byte{0, 0, 0, 0, 0, 0, 0, 1},
},
}
Expand All @@ -57,12 +58,12 @@ func TestSequences(t *testing.T) {
span := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 0})

adjTrace, err := test.adjuster.Adjust(trace)
adjTraceSpan := adjTrace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
err := test.adjuster.Adjust(trace)
require.EqualError(t, err, test.err)

adjTraceSpan := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
assert.Equal(t, span, adjTraceSpan)
assert.EqualValues(t, test.lastSpanID, span.SpanID())
require.EqualError(t, err, test.err)
})
}
}
41 changes: 21 additions & 20 deletions cmd/query/app/querysvc/adjuster/ipattribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,31 @@ var ipAttributesToCorrect = map[string]struct{}{
// IPAttribute returns an adjuster that replaces numeric "ip" attributes,
// which usually contain IPv4 packed into uint32, with their string
// representation (e.g. "8.8.8.8"").
func IPAttribute() Adjuster {
return Func(func(traces ptrace.Traces) (ptrace.Traces, error) {
adjuster := ipAttributeAdjuster{}
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
adjuster.adjust(rs.Resource().Attributes())
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
adjuster.adjust(span.Attributes())
}
func IPAttribute() IPAttributeAdjuster {
return IPAttributeAdjuster{}
}

type IPAttributeAdjuster struct{}

func (ia IPAttributeAdjuster) Adjust(traces ptrace.Traces) error {
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
ia.adjustAttributes(rs.Resource().Attributes())
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
ia.adjustAttributes(span.Attributes())
}
}
return traces, nil
})
}
return nil
}

type ipAttributeAdjuster struct{}

func (ipAttributeAdjuster) adjust(attributes pcommon.Map) {
func (IPAttributeAdjuster) adjustAttributes(attributes pcommon.Map) {
adjusted := make(map[string]string)
attributes.Range(func(k string, v pcommon.Value) bool {
if _, ok := ipAttributesToCorrect[k]; !ok {
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/adjuster/ipattribute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func TestIPAttributeAdjuster(t *testing.T) {
}
}

trace, err := IPAttribute().Adjust(traces)
err := IPAttribute().Adjust(traces)
require.NoError(t, err)

resourceSpan := trace.ResourceSpans().At(0)
resourceSpan := traces.ResourceSpans().At(0)
assert.Equal(t, 3, resourceSpan.Resource().Attributes().Len())

assertAttribute(resourceSpan.Resource().Attributes(), "a", 42)
Expand Down
7 changes: 4 additions & 3 deletions cmd/query/app/querysvc/adjuster/resourceattributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/pkg/otelsemconv"
)

Expand All @@ -28,7 +29,7 @@ func ResourceAttributes() ResourceAttributesAdjuster {

type ResourceAttributesAdjuster struct{}

func (o ResourceAttributesAdjuster) Adjust(traces ptrace.Traces) (ptrace.Traces, error) {
func (o ResourceAttributesAdjuster) Adjust(traces ptrace.Traces) error {
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
Expand All @@ -43,7 +44,7 @@ func (o ResourceAttributesAdjuster) Adjust(traces ptrace.Traces) (ptrace.Traces,
}
}
}
return traces, nil
return nil
}

func (ResourceAttributesAdjuster) moveAttributes(span ptrace.Span, resource pcommon.Resource) {
Expand All @@ -57,7 +58,7 @@ func (ResourceAttributesAdjuster) moveAttributes(span ptrace.Span, resource pcom
for k, v := range replace {
existing, ok := resource.Attributes().Get(k)
if ok && existing.AsRaw() != v.AsRaw() {
addWarning(span, "conflicting values between Span and Resource for attribute "+k)
jptrace.AddWarning(span, "conflicting values between Span and Resource for attribute "+k)
continue
}
v.CopyTo(resource.Attributes().PutEmpty(k))
Expand Down
35 changes: 16 additions & 19 deletions cmd/query/app/querysvc/adjuster/resourceattributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/pkg/otelsemconv"
)

Expand All @@ -25,10 +26,9 @@ func TestResourceAttributesAdjuster_SpanWithLibraryAttributes(t *testing.T) {
span.Attributes().PutStr("another_key", "another_value")

adjuster := ResourceAttributes()
result, err := adjuster.Adjust(traces)
require.NoError(t, err)
require.NoError(t, adjuster.Adjust(traces))

resultSpanAttributes := result.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()
resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()
require.Equal(t, 2, resultSpanAttributes.Len())
val, ok := resultSpanAttributes.Get("random_key")
require.True(t, ok)
Expand All @@ -38,7 +38,7 @@ func TestResourceAttributesAdjuster_SpanWithLibraryAttributes(t *testing.T) {
require.True(t, ok)
require.Equal(t, "another_value", val.Str())

resultResourceAttributes := result.ResourceSpans().At(0).Resource().Attributes()
resultResourceAttributes := traces.ResourceSpans().At(0).Resource().Attributes()

val, ok = resultResourceAttributes.Get(string(otelsemconv.TelemetrySDKLanguageKey))
require.True(t, ok)
Expand Down Expand Up @@ -68,10 +68,9 @@ func TestResourceAttributesAdjuster_SpanWithoutLibraryAttributes(t *testing.T) {
span.Attributes().PutStr("random_key", "random_value")

adjuster := ResourceAttributes()
result, err := adjuster.Adjust(traces)
require.NoError(t, err)
require.NoError(t, adjuster.Adjust(traces))

resultSpanAttributes := result.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()
resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()
require.Equal(t, 1, resultSpanAttributes.Len())
val, ok := resultSpanAttributes.Get("random_key")
require.True(t, ok)
Expand All @@ -87,10 +86,10 @@ func TestResourceAttributesAdjuster_SpanWithConflictingLibraryAttributes(t *test
span.Attributes().PutStr(string(otelsemconv.TelemetrySDKLanguageKey), "Java")

adjuster := ResourceAttributes()
result, err := adjuster.Adjust(traces)
require.NoError(t, err)
require.NoError(t, adjuster.Adjust(traces))

resultSpanAttributes := result.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()
resultSpan := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
resultSpanAttributes := resultSpan.Attributes()
require.Equal(t, 3, resultSpanAttributes.Len())
val, ok := resultSpanAttributes.Get("random_key")
require.True(t, ok)
Expand All @@ -101,13 +100,12 @@ func TestResourceAttributesAdjuster_SpanWithConflictingLibraryAttributes(t *test
require.True(t, ok)
require.Equal(t, "Java", val.Str())

val, ok = resultSpanAttributes.Get("jaeger.adjuster.warning")
warnings := jptrace.GetWarnings(resultSpan)
require.True(t, ok)
warnings := val.Slice()
require.Equal(t, 1, warnings.Len())
require.Equal(t, "conflicting values between Span and Resource for attribute telemetry.sdk.language", warnings.At(0).Str())
require.Len(t, warnings, 1)
require.Equal(t, "conflicting values between Span and Resource for attribute telemetry.sdk.language", warnings[0])

resultResourceAttributes := result.ResourceSpans().At(0).Resource().Attributes()
resultResourceAttributes := traces.ResourceSpans().At(0).Resource().Attributes()
val, ok = resultResourceAttributes.Get(string(otelsemconv.TelemetrySDKLanguageKey))
require.True(t, ok)
require.Equal(t, "Go", val.Str())
Expand All @@ -122,16 +120,15 @@ func TestResourceAttributesAdjuster_SpanWithNonConflictingLibraryAttributes(t *t
span.Attributes().PutStr(string(otelsemconv.TelemetrySDKLanguageKey), "Go")

adjuster := ResourceAttributes()
result, err := adjuster.Adjust(traces)
require.NoError(t, err)
require.NoError(t, adjuster.Adjust(traces))

resultSpanAttributes := result.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()
resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()
require.Equal(t, 1, resultSpanAttributes.Len())
val, ok := resultSpanAttributes.Get("random_key")
require.True(t, ok)
require.Equal(t, "random_value", val.Str())

resultResourceAttributes := result.ResourceSpans().At(0).Resource().Attributes()
resultResourceAttributes := traces.ResourceSpans().At(0).Resource().Attributes()
val, ok = resultResourceAttributes.Get(string(otelsemconv.TelemetrySDKLanguageKey))
require.True(t, ok)
require.Equal(t, "Go", val.Str())
Expand Down
43 changes: 22 additions & 21 deletions cmd/query/app/querysvc/adjuster/spanlinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,36 @@ import (
)

// SpanLinks creates an adjuster that removes span links with empty trace IDs.
func SpanLinks() Adjuster {
return Func(func(traces ptrace.Traces) (ptrace.Traces, error) {
adjuster := linksAdjuster{}
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
adjuster.adjust(span)
}
func SpanLinks() LinksAdjuster {
return LinksAdjuster{}
}

type LinksAdjuster struct{}

func (la LinksAdjuster) Adjust(traces ptrace.Traces) error {
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
la.adjust(span)
}
}
return traces, nil
})
}
return nil
}

type linksAdjuster struct{}

// adjust removes invalid links from a span.
func (l linksAdjuster) adjust(span ptrace.Span) {
func (la LinksAdjuster) adjust(span ptrace.Span) {
links := span.Links()
validLinks := ptrace.NewSpanLinkSlice()
for i := 0; i < links.Len(); i++ {
link := links.At(i)
if l.valid(link) {
if la.valid(link) {
newLink := validLinks.AppendEmpty()
link.CopyTo(newLink)
}
Expand All @@ -45,6 +46,6 @@ func (l linksAdjuster) adjust(span ptrace.Span) {
}

// valid checks if a span link's TraceID is not empty.
func (linksAdjuster) valid(link ptrace.SpanLink) bool {
func (LinksAdjuster) valid(link ptrace.SpanLink) bool {
return !link.TraceID().IsEmpty()
}
Loading

0 comments on commit ef799d1

Please sign in to comment.