Skip to content

Commit

Permalink
Replace Record limit methods with DroppedAttributes (#5190)
Browse files Browse the repository at this point in the history
* Replace Record lim methods with DroppedAttributes

* Add changelog entry

* Add TestRecordDroppedAttributes

* Add TestRecordCompactAttr

* Add an indexPool

* Fix gramatical error

* Apply feedback

Reduce indentation level.

* Apply feedback

Comment compactAttr and deduplicate.

* Deduplicate all attributes when added

* Comment why head is not used

* Clarify comments

* Move TestAllocationLimits to new file

Do not run this test when the race detector is on.

* Comment follow-up task
  • Loading branch information
MrAlias authored Apr 16, 2024
1 parent 1ff2e71 commit dbe27d4
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 63 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add `Recorder` in `go.opentelemetry.io/otel/log/logtest` to facilitate testing the log bridge implementations. (#5134)
- The `DroppedAttributes` is added to the `"go.opentelemetry.io/otel/sdk/log".Record` type.
This method can be used to determine how many log attributes were dropped from the `Record` due to limits being exceeded. (#5190)
- Add span flags to OTLP spans and links exported by `go.opentelemetry.io/otel/exporters/otlp/otlptrace`. (#5194)

### Changed

- Update `go.opentelemetry.io/proto/otlp` from v1.1.0 to v1.2.0. (#5177)

### Removed

- The `AttributeCountLimit` on the `"go.opentelemetry.io/otel/sdk/log".Record` type is removed. (#5190)
- The `AttributeValueLengthLimit` on the `"go.opentelemetry.io/otel/sdk/log".Record` type is removed. (#5190)

## [1.25.0/0.47.0/0.0.8/0.1.0-alpha] 2024-04-05

### Added
Expand Down
6 changes: 2 additions & 4 deletions exporters/stdout/stdoutlog/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ func (e *Exporter) newRecordJSON(r sdklog.Record) recordJSON {

Attributes: make([]log.KeyValue, 0, r.AttributesLen()),

Resource: r.Resource(),
Scope: r.InstrumentationScope(),
AttributeValueLengthLimit: r.AttributeValueLengthLimit(),
AttributeCountLimit: r.AttributeCountLimit(),
Resource: r.Resource(),
Scope: r.InstrumentationScope(),
}

r.WalkAttributes(func(kv log.KeyValue) bool {
Expand Down
46 changes: 46 additions & 0 deletions sdk/log/logger_norace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !race

package log

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/instrumentation"
)

func TestAllocationLimits(t *testing.T) {
// This test is not run with a race detector. The sync.Pool used by parts
// of the SDK has memory optimizations removed for the race detector. Do
// not test performance of the SDK in that state.

const runs = 10

logger := newLogger(NewLoggerProvider(), instrumentation.Scope{})

r := log.Record{}
r.SetTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC))
r.SetObservedTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC))
r.SetBody(log.StringValue("testing body value"))
r.SetSeverity(log.SeverityInfo)
r.SetSeverityText("testing text")

r.AddAttributes(
log.String("k1", "str"),
log.Float64("k2", 1.0),
log.Int("k3", 2),
log.Bool("k4", true),
log.Bytes("k5", []byte{1}),
)

assert.Equal(t, 0.0, testing.AllocsPerRun(runs, func() {
logger.newRecord(context.Background(), r)
}), "newRecord")
}
25 changes: 0 additions & 25 deletions sdk/log/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,28 +273,3 @@ func TestLoggerEnabled(t *testing.T) {
})
}
}

func TestAllocationLimits(t *testing.T) {
const runs = 10

logger := newLogger(NewLoggerProvider(), instrumentation.Scope{})

r := log.Record{}
r.SetTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC))
r.SetObservedTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC))
r.SetBody(log.StringValue("testing body value"))
r.SetSeverity(log.SeverityInfo)
r.SetSeverityText("testing text")

r.AddAttributes(
log.String("k1", "str"),
log.Float64("k2", 1.0),
log.Int("k3", 2),
log.Bool("k4", true),
log.Bytes("k5", []byte{1}),
)

assert.Equal(t, 0.0, testing.AllocsPerRun(runs, func() {
logger.newRecord(context.Background(), r)
}), "newRecord")
}
174 changes: 154 additions & 20 deletions sdk/log/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"slices"
"sync"
"time"

"go.opentelemetry.io/otel/log"
Expand All @@ -19,6 +20,20 @@ import (
// cover 95% of all use-cases (https://go.dev/blog/slog#performance).
const attributesInlineCount = 5

// indexPool is a pool of index maps used for de-duplication.
var indexPool = sync.Pool{
New: func() any { return make(map[string]int) },
}

func getIndex() map[string]int {
return indexPool.Get().(map[string]int)
}

func putIndex(index map[string]int) {
clear(index)
indexPool.Put(index)
}

// Record is a log record emitted by the Logger.
type Record struct {
// Do not embed the log.Record. Attributes need to be overwrite-able and
Expand Down Expand Up @@ -48,6 +63,10 @@ type Record struct {
// - Unused array elements are zero-ed. Used to detect mistakes.
back []log.KeyValue

// dropped is the count of attributes that have been dropped when limits
// were reached.
dropped int

traceID trace.TraceID
spanID trace.SpanID
traceFlags trace.TraceFlags
Expand Down Expand Up @@ -131,6 +150,99 @@ func (r *Record) WalkAttributes(f func(log.KeyValue) bool) {

// AddAttributes adds attributes to the log record.
func (r *Record) AddAttributes(attrs ...log.KeyValue) {
n := r.AttributesLen()
if n == 0 {
// Avoid the more complex duplicate map lookups bellow.
attrs, r.dropped = dedup(attrs)

var drop int
attrs, drop = head(attrs, r.attributeCountLimit)
r.dropped += drop

r.addAttrs(attrs)
return
}

// Used to find duplicates between attrs and existing attributes in r.
rIndex := r.attrIndex()
defer putIndex(rIndex)

// Unique attrs that need to be added to r. This uses the same underlying
// array as attrs.
//
// Note, do not iterate attrs twice by just calling dedup(attrs) here.
unique := attrs[:0]
// Used to find duplicates within attrs itself. The index value is the
// index of the element in unique.
uIndex := getIndex()
defer putIndex(uIndex)

// Deduplicate attrs within the scope of all existing attributes.
for _, a := range attrs {
// Last-value-wins for any duplicates in attrs.
idx, found := uIndex[a.Key]
if found {
r.dropped++
unique[idx] = a
continue
}

idx, found = rIndex[a.Key]
if found {
// New attrs overwrite any existing with the same key.
r.dropped++
if idx < 0 {
r.front[-(idx + 1)] = a
} else {
r.back[idx] = a
}
} else {
// Unique attribute.
// TODO: apply truncation to string and []string values.
// TODO: deduplicate map values.
unique = append(unique, a)
uIndex[a.Key] = len(unique) - 1
}
}
attrs = unique

if r.attributeCountLimit > 0 && n+len(attrs) > r.attributeCountLimit {
// Truncate the now unique attributes to comply with limit.
//
// Do not use head(attrs, r.attributeCountLimit - n) here. If
// (r.attributeCountLimit - n) <= 0 attrs needs to be emptied.
last := max(0, (r.attributeCountLimit - n))
r.dropped += len(attrs) - last
attrs = attrs[:last]
}

r.addAttrs(attrs)
}

// attrIndex returns an index map for all attributes in the Record r. The index
// maps the attribute key to location the attribute is stored. If the value is
// < 0 then -(value + 1) (e.g. -1 -> 0, -2 -> 1, -3 -> 2) represents the index
// in r.nFront. Otherwise, the index is the exact index of r.back.
//
// The returned index is taken from the indexPool. It is the callers
// responsibility to return the index to that pool (putIndex) when done.
func (r *Record) attrIndex() map[string]int {
index := getIndex()
for i := 0; i < r.nFront; i++ {
key := r.front[i].Key
index[key] = -i - 1 // stored in front: negative index.
}
for i := 0; i < len(r.back); i++ {
key := r.back[i].Key
index[key] = i // stored in back: positive index.
}
return index
}

// addAttrs adds attrs to the Record r. This does not validate any limits or
// duplication of attributes, these tasks are left to the caller to handle
// prior to calling.
func (r *Record) addAttrs(attrs []log.KeyValue) {
var i int
for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ {
a := attrs[i]
Expand All @@ -144,6 +256,14 @@ func (r *Record) AddAttributes(attrs ...log.KeyValue) {

// SetAttributes sets (and overrides) attributes to the log record.
func (r *Record) SetAttributes(attrs ...log.KeyValue) {
// TODO: apply truncation to string and []string values.
// TODO: deduplicate map values.
attrs, r.dropped = dedup(attrs)

var drop int
attrs, drop = head(attrs, r.attributeCountLimit)
r.dropped += drop

r.nFront = 0
var i int
for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ {
Expand All @@ -155,11 +275,45 @@ func (r *Record) SetAttributes(attrs ...log.KeyValue) {
r.back = slices.Clone(attrs[i:])
}

// head returns the first n values of kvs along with the number of elements
// dropped. If n is less than or equal to zero, kvs is returned with 0.
func head(kvs []log.KeyValue, n int) (out []log.KeyValue, dropped int) {
if n > 0 && len(kvs) > n {
return kvs[:n], len(kvs) - n
}
return kvs, 0
}

// dedup deduplicates kvs front-to-back with the last value saved.
func dedup(kvs []log.KeyValue) (unique []log.KeyValue, dropped int) {
index := getIndex()
defer putIndex(index)

unique = kvs[:0] // Use the same underlying array as kvs.
for _, a := range kvs {
idx, found := index[a.Key]
if found {
dropped++
unique[idx] = a
} else {
unique = append(unique, a)
index[a.Key] = len(unique) - 1
}
}
return unique, dropped
}

// AttributesLen returns the number of attributes in the log record.
func (r *Record) AttributesLen() int {
return r.nFront + len(r.back)
}

// DroppedAttributes returns the number of attributes dropped due to limits
// being reached.
func (r *Record) DroppedAttributes() int {
return r.dropped
}

// TraceID returns the trace ID or empty array.
func (r *Record) TraceID() trace.TraceID {
return r.traceID
Expand Down Expand Up @@ -206,26 +360,6 @@ func (r *Record) InstrumentationScope() instrumentation.Scope {
return *r.scope
}

// AttributeValueLengthLimit is the maximum allowed attribute value length.
//
// This limit only applies to string and string slice attribute values.
// Any string longer than this value should be truncated to this length.
//
// Negative value means no limit should be applied.
func (r *Record) AttributeValueLengthLimit() int {
return r.attributeValueLengthLimit
}

// AttributeCountLimit is the maximum allowed log record attribute count. Any
// attribute added to a log record once this limit is reached should be dropped.
//
// Zero means no attributes should be recorded.
//
// Negative value means no limit should be applied.
func (r *Record) AttributeCountLimit() int {
return r.attributeCountLimit
}

// Clone returns a copy of the record with no shared state. The original record
// and the clone can both be modified without interfering with each other.
func (r *Record) Clone() Record {
Expand Down
Loading

0 comments on commit dbe27d4

Please sign in to comment.