Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Record limit methods with DroppedAttributes #5190

Merged
merged 18 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add `Recorder` in `go.opentelemetry.io/otel/log/logtest` to facilitate testing the log bridge implementations. (#5134)
- The `DroppedAttributes` is added to the `"go.opentelemetry.io/otel/sdk/log".Record` type.
This method can be used to determine how many log attributes were dropped from the `Record` due to limits being exceeded. (#5190)

### Changed

- Update `go.opentelemetry.io/proto/otlp` from v1.1.0 to v1.2.0. (#5177)

### Removed

- The `AttributeCountLimit` on the `"go.opentelemetry.io/otel/sdk/log".Record` type is removed. (#5190)
- The `AttributeValueLengthLimit` on the `"go.opentelemetry.io/otel/sdk/log".Record` type is removed. (#5190)

## [1.25.0/0.47.0/0.0.8/0.1.0-alpha] 2024-04-05

### Added
Expand Down
6 changes: 2 additions & 4 deletions exporters/stdout/stdoutlog/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ func (e *Exporter) newRecordJSON(r sdklog.Record) recordJSON {

Attributes: make([]log.KeyValue, 0, r.AttributesLen()),

Resource: r.Resource(),
Scope: r.InstrumentationScope(),
AttributeValueLengthLimit: r.AttributeValueLengthLimit(),
AttributeCountLimit: r.AttributeCountLimit(),
Resource: r.Resource(),
Scope: r.InstrumentationScope(),
}

r.WalkAttributes(func(kv log.KeyValue) bool {
Expand Down
160 changes: 140 additions & 20 deletions sdk/log/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"slices"
"sync"
"time"

"go.opentelemetry.io/otel/log"
Expand All @@ -19,6 +20,20 @@ import (
// cover 95% of all use-cases (https://go.dev/blog/slog#performance).
const attributesInlineCount = 5

// indexPool is a pool of index maps used for de-duplication.
var indexPool = sync.Pool{
New: func() any { return make(map[string]int) },
}

func getIndex() map[string]int {
return indexPool.Get().(map[string]int)
}

func putIndex(index map[string]int) {
clear(index)
indexPool.Put(index)
}

// Record is a log record emitted by the Logger.
type Record struct {
// Do not embed the log.Record. Attributes need to be overwrite-able and
Expand Down Expand Up @@ -48,6 +63,10 @@ type Record struct {
// - Unused array elements are zero-ed. Used to detect mistakes.
back []log.KeyValue

// dropped is the count of attributes that have been dropped when limits
// were reached.
dropped int
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

traceID trace.TraceID
spanID trace.SpanID
traceFlags trace.TraceFlags
Expand Down Expand Up @@ -131,6 +150,26 @@ func (r *Record) WalkAttributes(f func(log.KeyValue) bool) {

// AddAttributes adds attributes to the log record.
func (r *Record) AddAttributes(attrs ...log.KeyValue) {
if r.attributeCountLimit > 0 {
if r.AttributesLen()+len(attrs) > r.attributeCountLimit {
r.compactAttr()
// TODO: apply truncation to string and []string values.
var dropped int
attrs, dropped = deduplicate(attrs)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
r.dropped += dropped

if n := r.AttributesLen(); n+len(attrs) > r.attributeCountLimit {
last := max(0, (r.attributeCountLimit - n))
r.dropped += len(attrs) - last
attrs = attrs[:last]
}
}
}
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

r.addAttributes(attrs)
}

func (r *Record) addAttributes(attrs []log.KeyValue) {
var i int
for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ {
a := attrs[i]
Expand All @@ -142,8 +181,103 @@ func (r *Record) AddAttributes(attrs ...log.KeyValue) {
r.back = append(r.back, attrs[i:]...)
}

func (r *Record) compactAttr() {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
// index holds the location of attributes in the record based on the
// attribute key. If the value stored is < 0 the -(value + 1) (e.g. -1 ->
// 0, -2 -> 1, -3 -> 2) represents the index in r.nFront. Otherwise, the
// index is the exact index of r.back.
index := getIndex()
defer putIndex(index)

var dropped int
var cursor int
for i := 0; i < r.nFront; i++ {
key := r.front[i].Key
idx, found := index[key]
if found {
dropped++
r.front[-(idx + 1)] = r.front[i]
} else {
r.front[cursor] = r.front[i]
index[key] = -cursor - 1 // stored in front: negative index.
cursor++
}
}
r.nFront -= dropped

// Compact back storage into front.
for cursor < attributesInlineCount && len(r.back) > 0 {
key := r.back[0].Key
idx, found := index[key]
if found {
dropped++
r.front[-(idx + 1)] = r.back[0]
} else {
r.front[cursor] = r.back[0]
r.nFront++

index[key] = -cursor - 1 // stored in front: negative index.
cursor++
}
r.back = r.back[1:]
}

for i := 0; i < len(r.back); i++ {
key := r.back[i].Key
idx, found := index[key]
if found {
dropped++
if idx < 0 {
r.front[-(idx + 1)] = r.back[i]
} else {
r.back[idx] = r.back[i]
}
r.back = append(r.back[:i], r.back[i+1:]...)
i--
} else {
index[key] = i // stored in back: positive index.
}
}

r.dropped += dropped
}

// SetAttributes sets (and overrides) attributes to the log record.
func (r *Record) SetAttributes(attrs ...log.KeyValue) {
// If adding these attributes could exceed limit, de-duplicate to minimize
dashpole marked this conversation as resolved.
Show resolved Hide resolved
// overflow.
if r.attributeCountLimit > 0 && len(attrs) > r.attributeCountLimit {
// TODO: apply truncation to string and []string values.
attrs, r.dropped = deduplicate(attrs)
if len(attrs) > r.attributeCountLimit {
r.dropped += len(attrs) - r.attributeCountLimit
attrs = attrs[:r.attributeCountLimit]
}
}

r.setAttributes(attrs)
}

func deduplicate(kvs []log.KeyValue) (unique []log.KeyValue, dropped int) {
unique = kvs[:0]

index := getIndex()
defer putIndex(index)

for _, a := range kvs {
idx, found := index[a.Key]
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
if found {
dropped++
unique[idx] = a
} else {
unique = append(unique, a)
index[a.Key] = len(unique) - 1
}
}
return unique, dropped
}

func (r *Record) setAttributes(attrs []log.KeyValue) {
r.nFront = 0
var i int
for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ {
Expand All @@ -160,6 +294,12 @@ func (r *Record) AttributesLen() int {
return r.nFront + len(r.back)
}

// DroppedAttributes returns the number of attributes dropped due to limits
// being reached.
func (r *Record) DroppedAttributes() int {
return r.dropped
}

// TraceID returns the trace ID or empty array.
func (r *Record) TraceID() trace.TraceID {
return r.traceID
Expand Down Expand Up @@ -206,26 +346,6 @@ func (r *Record) InstrumentationScope() instrumentation.Scope {
return *r.scope
}

// AttributeValueLengthLimit is the maximum allowed attribute value length.
//
// This limit only applies to string and string slice attribute values.
// Any string longer than this value should be truncated to this length.
//
// Negative value means no limit should be applied.
func (r *Record) AttributeValueLengthLimit() int {
return r.attributeValueLengthLimit
}

// AttributeCountLimit is the maximum allowed log record attribute count. Any
// attribute added to a log record once this limit is reached should be dropped.
//
// Zero means no attributes should be recorded.
//
// Negative value means no limit should be applied.
func (r *Record) AttributeCountLimit() int {
return r.attributeCountLimit
}

// Clone returns a copy of the record with no shared state. The original record
// and the clone can both be modified without interfering with each other.
func (r *Record) Clone() Record {
Expand Down
130 changes: 116 additions & 14 deletions sdk/log/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package log

import (
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -125,20 +126,6 @@ func TestRecordInstrumentationScope(t *testing.T) {
assert.Equal(t, scope, r.InstrumentationScope())
}

func TestRecordAttributeValueLengthLimit(t *testing.T) {
limit := 12
r := new(Record)
r.attributeValueLengthLimit = limit
assert.Equal(t, limit, r.AttributeValueLengthLimit())
}

func TestRecordAttributeCountLimit(t *testing.T) {
limit := 21
r := new(Record)
r.attributeCountLimit = limit
assert.Equal(t, limit, r.AttributeCountLimit())
}

func TestRecordClone(t *testing.T) {
now0 := time.Now()
sev0 := log.SeverityInfo
Expand Down Expand Up @@ -204,3 +191,118 @@ func TestRecordClone(t *testing.T) {
return assert.Truef(t, kv.Equal(attr1), "%v != %v", kv, attr1)
})
}

func TestRecordDroppedAttributes(t *testing.T) {
for i := 1; i < attributesInlineCount*5; i++ {
r := new(Record)
r.attributeCountLimit = 1

attrs := make([]log.KeyValue, i)
attrs[0] = log.Bool("only key different then the rest", true)
r.AddAttributes(attrs...)
assert.Equalf(t, i-1, r.DroppedAttributes(), "%d: AddAttributes", i)

r.SetAttributes(attrs...)
assert.Equalf(t, i-1, r.DroppedAttributes(), "%d: SetAttributes", i)
}
}

func TestRecordCompactAttr(t *testing.T) {
testcases := []struct {
name string
attrs []log.KeyValue
want []log.KeyValue
}{
{
name: "EmptyKey",
attrs: make([]log.KeyValue, 10),
want: make([]log.KeyValue, 1),
},
{
name: "NonEmptyKey",
attrs: []log.KeyValue{
log.Bool("key", true),
log.Int64("key", 1),
log.Bool("key", false),
log.Float64("key", 2.),
log.String("key", "3"),
log.Slice("key", log.Int64Value(4)),
log.Map("key", log.Int("key", 5)),
log.Bytes("key", []byte("six")),
log.Bool("key", false),
},
want: []log.KeyValue{
log.Bool("key", false),
},
},
{
name: "Multiple",
attrs: []log.KeyValue{
log.Bool("a", true),
log.Int64("b", 1),
log.Bool("a", false),
log.Float64("c", 2.),
log.String("b", "3"),
log.Slice("d", log.Int64Value(4)),
log.Map("a", log.Int("key", 5)),
log.Bytes("d", []byte("six")),
log.Bool("e", true),
log.Int("f", 1),
log.Int("f", 2),
log.Int("f", 3),
log.Float64("b", 0.0),
log.Float64("b", 0.0),
log.String("g", "G"),
log.String("h", "H"),
log.String("g", "GG"),
log.Bool("a", false),
},
want: []log.KeyValue{
// Order is important here.
log.Bool("a", false),
log.Float64("b", 0.0),
log.Float64("c", 2.),
log.Bytes("d", []byte("six")),
log.Bool("e", true),
log.Int("f", 3),
log.String("g", "GG"),
log.String("h", "H"),
},
},
{
name: "NoDuplicate",
attrs: func() []log.KeyValue {
out := make([]log.KeyValue, attributesInlineCount*2)
for i := range out {
out[i] = log.Bool(strconv.Itoa(i), true)
}
return out
}(),
want: func() []log.KeyValue {
out := make([]log.KeyValue, attributesInlineCount*2)
for i := range out {
out[i] = log.Bool(strconv.Itoa(i), true)
}
return out
}(),
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
r := new(Record)
r.setAttributes(tc.attrs)
r.compactAttr()

var i int
r.WalkAttributes(func(kv log.KeyValue) bool {
if assert.Lessf(t, i, len(tc.want), "additional: %v", kv) {
want := tc.want[i]
assert.Truef(t, kv.Equal(want), "%d: want %v, got %v", i, want, kv)
}
i++
return true
})
})
}
}