Skip to content

Commit

Permalink
[v2][adjuster] Implement span ID uniquifier adjuster to operate on ot…
Browse files Browse the repository at this point in the history
…lp data model (#6367)

## Which problem is this PR solving?
- Towards #6344 

## Description of the changes
- Implement the Span ID Uniquifier adjuster to operate on the OTLP data
model.

## How was this change tested?
- Unit tests

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Dec 18, 2024
1 parent dd799a0 commit 376061e
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 0 deletions.
8 changes: 8 additions & 0 deletions cmd/query/app/querysvc/adjuster/adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ type Adjuster interface {
Adjust(ptrace.Traces) error
}

// Func is a type alias that wraps a function and makes an Adjuster from it.
type Func func(traces ptrace.Traces) error

// Adjust implements Adjuster interface for the Func alias.
func (f Func) Adjust(traces ptrace.Traces) error {
return f(traces)
}

// Sequence creates an adjuster that combines a series of adjusters
// applied in order. Errors from each step are accumulated and returned
// in the end as a single wrapper error. Errors do not interrupt the
Expand Down
2 changes: 2 additions & 0 deletions cmd/query/app/querysvc/adjuster/ipattribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

var _ Adjuster = (*IPAttributeAdjuster)(nil)

var ipAttributesToCorrect = map[string]struct{}{
"ip": {},
"peer.ipv4": {},
Expand Down
2 changes: 2 additions & 0 deletions cmd/query/app/querysvc/adjuster/resourceattributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/jaegertracing/jaeger/pkg/otelsemconv"
)

var _ Adjuster = (*ResourceAttributesAdjuster)(nil)

var libraryKeys = map[string]struct{}{
string(otelsemconv.TelemetrySDKLanguageKey): {},
string(otelsemconv.TelemetrySDKNameKey): {},
Expand Down
155 changes: 155 additions & 0 deletions cmd/query/app/querysvc/adjuster/spaniduniquifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"errors"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
)

var errTooManySpans = errors.New("cannot assign unique span ID, too many spans in the trace")

// SpanIDUniquifier returns an adjuster that changes span ids for server
// spans (i.e. spans with tag: span.kind == server) if there is another
// client span that shares the same span ID. This is needed to deal with
// Zipkin-style clients that reuse the same span ID for both client and server
// side of an RPC call. Jaeger UI expects all spans to have unique IDs.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
func SpanIDUniquifier() Adjuster {
return Func(func(traces ptrace.Traces) error {
adjuster := spanIDDeduper{
spansByID: make(map[pcommon.SpanID][]ptrace.Span),
maxUsedID: pcommon.NewSpanIDEmpty(),
}
return adjuster.adjust(traces)
})
}

type spanIDDeduper struct {
spansByID map[pcommon.SpanID][]ptrace.Span
maxUsedID pcommon.SpanID
}

func (d *spanIDDeduper) adjust(traces ptrace.Traces) error {
d.groupSpansByID(traces)
d.uniquifyServerSpanIDs(traces)
return nil
}

// groupSpansByID groups spans with the same ID returning a map id -> []Span
func (d *spanIDDeduper) groupSpansByID(traces ptrace.Traces) {
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if spans, ok := d.spansByID[span.SpanID()]; ok {
d.spansByID[span.SpanID()] = append(spans, span)
} else {
d.spansByID[span.SpanID()] = []ptrace.Span{span}
}
}
}
}
}

func (d *spanIDDeduper) isSharedWithClientSpan(spanID pcommon.SpanID) bool {
spans := d.spansByID[spanID]
for _, span := range spans {
if span.Kind() == ptrace.SpanKindClient {
return true
}
}
return false
}

func (d *spanIDDeduper) uniquifyServerSpanIDs(traces ptrace.Traces) {
oldToNewSpanIDs := make(map[pcommon.SpanID]pcommon.SpanID)
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
// only replace span IDs for server-side spans that share the ID with something else
if span.Kind() == ptrace.SpanKindServer && d.isSharedWithClientSpan(span.SpanID()) {
newID, err := d.makeUniqueSpanID()
if err != nil {
jptrace.AddWarning(span, err.Error())
continue
}
oldToNewSpanIDs[span.SpanID()] = newID
span.SetParentSpanID(span.SpanID()) // previously shared ID is the new parent
span.SetSpanID(newID)
}
}
}
}
d.swapParentIDs(traces, oldToNewSpanIDs)
}

// swapParentIDs corrects ParentSpanID of all spans that are children of the server
// spans whose IDs we made unique.
func (*spanIDDeduper) swapParentIDs(
traces ptrace.Traces,
oldToNewSpanIDs map[pcommon.SpanID]pcommon.SpanID,
) {
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok {
if span.SpanID() != parentID {
span.SetParentSpanID(parentID)
}
}
}
}
}
}

// makeUniqueSpanID returns a new ID that is not used in the trace,
// or an error if such ID cannot be generated, which is unlikely,
// given that the whole space of span IDs is 2^64.
func (d *spanIDDeduper) makeUniqueSpanID() (pcommon.SpanID, error) {
id := incrementSpanID(d.maxUsedID)
for id != pcommon.NewSpanIDEmpty() {
if _, exists := d.spansByID[id]; !exists {
d.maxUsedID = id
return id, nil
}
id = incrementSpanID(id)
}
return pcommon.NewSpanIDEmpty(), errTooManySpans
}

func incrementSpanID(spanID pcommon.SpanID) pcommon.SpanID {
newID := spanID
for i := len(newID) - 1; i >= 0; i-- {
newID[i]++
if newID[i] != 0 {
break
}
}
return newID
}
104 changes: 104 additions & 0 deletions cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
)

var (
clientSpanID = pcommon.SpanID([]byte{0, 0, 0, 0, 0, 0, 0, 1})
anotherSpanID = pcommon.SpanID([]byte{1, 0, 0, 0, 0, 0, 0, 0})
)

func makeTraces() ptrace.Traces {
traceID := pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3})

traces := ptrace.NewTraces()
spans := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()

clientSpan := spans.AppendEmpty()
clientSpan.SetTraceID(traceID)
clientSpan.SetSpanID(clientSpanID)
clientSpan.SetKind(ptrace.SpanKindClient)

serverSpan := spans.AppendEmpty()
serverSpan.SetTraceID(traceID)
serverSpan.SetSpanID(clientSpanID) // shared span ID
serverSpan.SetKind(ptrace.SpanKindServer)

anotherSpan := spans.AppendEmpty()
anotherSpan.SetTraceID(traceID)
anotherSpan.SetSpanID(anotherSpanID)
anotherSpan.SetParentSpanID(clientSpanID)

return traces
}

func TestSpanIDUniquifierTriggered(t *testing.T) {
traces := makeTraces()
deduper := SpanIDUniquifier()
require.NoError(t, deduper.Adjust(traces))

spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans()

clientSpan := spans.At(0)
assert.Equal(t, clientSpanID, clientSpan.SpanID(), "client span ID should not change")

serverSpan := spans.At(1)
assert.EqualValues(t, []byte{0, 0, 0, 0, 0, 0, 0, 2}, serverSpan.SpanID(), "server span ID should be reassigned")
assert.EqualValues(t, clientSpanID, serverSpan.ParentSpanID(), "next server span should be this server span's parent")

thirdSpan := spans.At(2)
assert.Equal(t, anotherSpanID, thirdSpan.SpanID(), "3rd span ID should not change")
assert.Equal(t, serverSpan.SpanID(), thirdSpan.ParentSpanID(), "parent of 3rd span should change to new spanID")
}

func TestSpanIDUniquifierNotTriggered(t *testing.T) {
traces := makeTraces()
spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans()

// only copy server span and random span
newSpans := ptrace.NewSpanSlice()
spans.At(1).CopyTo(newSpans.AppendEmpty())
spans.At(2).CopyTo(newSpans.AppendEmpty())
newSpans.CopyTo(spans)

deduper := SpanIDUniquifier()
require.NoError(t, deduper.Adjust(traces))

gotSpans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans()

serverSpanID := clientSpanID // for better readability
serverSpan := gotSpans.At(0)
assert.Equal(t, serverSpanID, serverSpan.SpanID(), "server span ID should be unchanged")

thirdSpan := gotSpans.At(1)
assert.Equal(t, anotherSpanID, thirdSpan.SpanID(), "3rd span ID should not change")
}

func TestSpanIDUniquifierError(t *testing.T) {
traces := makeTraces()

maxID := pcommon.SpanID([8]byte{255, 255, 255, 255, 255, 255, 255, 255})

deduper := &spanIDDeduper{
spansByID: make(map[pcommon.SpanID][]ptrace.Span),
// instead of 0 start at the last possible value to cause an error
maxUsedID: maxID,
}
require.NoError(t, deduper.adjust(traces))

span := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1)
warnings := jptrace.GetWarnings(span)
require.Equal(t, []string{"cannot assign unique span ID, too many spans in the trace"}, warnings)
}
2 changes: 2 additions & 0 deletions cmd/query/app/querysvc/adjuster/spanlinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (
invalidSpanLinkWarning = "Invalid span link removed"
)

var _ Adjuster = (*LinksAdjuster)(nil)

// SpanLinks creates an adjuster that removes span links with empty trace IDs.
func SpanLinks() LinksAdjuster {
return LinksAdjuster{}
Expand Down

0 comments on commit 376061e

Please sign in to comment.