From dbe27d4147b9c5bdf3e2a828f469c1eb463de2ed Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 16 Apr 2024 11:48:17 -0700 Subject: [PATCH] Replace `Record` limit methods with `DroppedAttributes` (#5190) * Replace Record lim methods with DroppedAttributes * Add changelog entry * Add TestRecordDroppedAttributes * Add TestRecordCompactAttr * Add an indexPool * Fix gramatical error * Apply feedback Reduce indentation level. * Apply feedback Comment compactAttr and deduplicate. * Deduplicate all attributes when added * Comment why head is not used * Clarify comments * Move TestAllocationLimits to new file Do not run this test when the race detector is on. * Comment follow-up task --- CHANGELOG.md | 7 ++ exporters/stdout/stdoutlog/record.go | 6 +- sdk/log/logger_norace_test.go | 46 +++++++ sdk/log/logger_test.go | 25 ---- sdk/log/record.go | 174 ++++++++++++++++++++++++--- sdk/log/record_test.go | 152 ++++++++++++++++++++--- 6 files changed, 347 insertions(+), 63 deletions(-) create mode 100644 sdk/log/logger_norace_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d42a24eb9a3..1c81bbb3fa7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,12 +11,19 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - Add `Recorder` in `go.opentelemetry.io/otel/log/logtest` to facilitate testing the log bridge implementations. (#5134) +- The `DroppedAttributes` is added to the `"go.opentelemetry.io/otel/sdk/log".Record` type. + This method can be used to determine how many log attributes were dropped from the `Record` due to limits being exceeded. (#5190) - Add span flags to OTLP spans and links exported by `go.opentelemetry.io/otel/exporters/otlp/otlptrace`. (#5194) ### Changed - Update `go.opentelemetry.io/proto/otlp` from v1.1.0 to v1.2.0. (#5177) +### Removed + +- The `AttributeCountLimit` on the `"go.opentelemetry.io/otel/sdk/log".Record` type is removed. (#5190) +- The `AttributeValueLengthLimit` on the `"go.opentelemetry.io/otel/sdk/log".Record` type is removed. (#5190) + ## [1.25.0/0.47.0/0.0.8/0.1.0-alpha] 2024-04-05 ### Added diff --git a/exporters/stdout/stdoutlog/record.go b/exporters/stdout/stdoutlog/record.go index a7caea79ca6..134fffbe61c 100644 --- a/exporters/stdout/stdoutlog/record.go +++ b/exporters/stdout/stdoutlog/record.go @@ -42,10 +42,8 @@ func (e *Exporter) newRecordJSON(r sdklog.Record) recordJSON { Attributes: make([]log.KeyValue, 0, r.AttributesLen()), - Resource: r.Resource(), - Scope: r.InstrumentationScope(), - AttributeValueLengthLimit: r.AttributeValueLengthLimit(), - AttributeCountLimit: r.AttributeCountLimit(), + Resource: r.Resource(), + Scope: r.InstrumentationScope(), } r.WalkAttributes(func(kv log.KeyValue) bool { diff --git a/sdk/log/logger_norace_test.go b/sdk/log/logger_norace_test.go new file mode 100644 index 00000000000..a7a9aaebdb9 --- /dev/null +++ b/sdk/log/logger_norace_test.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !race + +package log + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/sdk/instrumentation" +) + +func TestAllocationLimits(t *testing.T) { + // This test is not run with a race detector. The sync.Pool used by parts + // of the SDK has memory optimizations removed for the race detector. Do + // not test performance of the SDK in that state. + + const runs = 10 + + logger := newLogger(NewLoggerProvider(), instrumentation.Scope{}) + + r := log.Record{} + r.SetTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)) + r.SetObservedTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)) + r.SetBody(log.StringValue("testing body value")) + r.SetSeverity(log.SeverityInfo) + r.SetSeverityText("testing text") + + r.AddAttributes( + log.String("k1", "str"), + log.Float64("k2", 1.0), + log.Int("k3", 2), + log.Bool("k4", true), + log.Bytes("k5", []byte{1}), + ) + + assert.Equal(t, 0.0, testing.AllocsPerRun(runs, func() { + logger.newRecord(context.Background(), r) + }), "newRecord") +} diff --git a/sdk/log/logger_test.go b/sdk/log/logger_test.go index b966f7614f3..6443bf77d71 100644 --- a/sdk/log/logger_test.go +++ b/sdk/log/logger_test.go @@ -273,28 +273,3 @@ func TestLoggerEnabled(t *testing.T) { }) } } - -func TestAllocationLimits(t *testing.T) { - const runs = 10 - - logger := newLogger(NewLoggerProvider(), instrumentation.Scope{}) - - r := log.Record{} - r.SetTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)) - r.SetObservedTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)) - r.SetBody(log.StringValue("testing body value")) - r.SetSeverity(log.SeverityInfo) - r.SetSeverityText("testing text") - - r.AddAttributes( - log.String("k1", "str"), - log.Float64("k2", 1.0), - log.Int("k3", 2), - log.Bool("k4", true), - log.Bytes("k5", []byte{1}), - ) - - assert.Equal(t, 0.0, testing.AllocsPerRun(runs, func() { - logger.newRecord(context.Background(), r) - }), "newRecord") -} diff --git a/sdk/log/record.go b/sdk/log/record.go index 33e1c86c74f..74ae2888e74 100644 --- a/sdk/log/record.go +++ b/sdk/log/record.go @@ -5,6 +5,7 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "slices" + "sync" "time" "go.opentelemetry.io/otel/log" @@ -19,6 +20,20 @@ import ( // cover 95% of all use-cases (https://go.dev/blog/slog#performance). const attributesInlineCount = 5 +// indexPool is a pool of index maps used for de-duplication. +var indexPool = sync.Pool{ + New: func() any { return make(map[string]int) }, +} + +func getIndex() map[string]int { + return indexPool.Get().(map[string]int) +} + +func putIndex(index map[string]int) { + clear(index) + indexPool.Put(index) +} + // Record is a log record emitted by the Logger. type Record struct { // Do not embed the log.Record. Attributes need to be overwrite-able and @@ -48,6 +63,10 @@ type Record struct { // - Unused array elements are zero-ed. Used to detect mistakes. back []log.KeyValue + // dropped is the count of attributes that have been dropped when limits + // were reached. + dropped int + traceID trace.TraceID spanID trace.SpanID traceFlags trace.TraceFlags @@ -131,6 +150,99 @@ func (r *Record) WalkAttributes(f func(log.KeyValue) bool) { // AddAttributes adds attributes to the log record. func (r *Record) AddAttributes(attrs ...log.KeyValue) { + n := r.AttributesLen() + if n == 0 { + // Avoid the more complex duplicate map lookups bellow. + attrs, r.dropped = dedup(attrs) + + var drop int + attrs, drop = head(attrs, r.attributeCountLimit) + r.dropped += drop + + r.addAttrs(attrs) + return + } + + // Used to find duplicates between attrs and existing attributes in r. + rIndex := r.attrIndex() + defer putIndex(rIndex) + + // Unique attrs that need to be added to r. This uses the same underlying + // array as attrs. + // + // Note, do not iterate attrs twice by just calling dedup(attrs) here. + unique := attrs[:0] + // Used to find duplicates within attrs itself. The index value is the + // index of the element in unique. + uIndex := getIndex() + defer putIndex(uIndex) + + // Deduplicate attrs within the scope of all existing attributes. + for _, a := range attrs { + // Last-value-wins for any duplicates in attrs. + idx, found := uIndex[a.Key] + if found { + r.dropped++ + unique[idx] = a + continue + } + + idx, found = rIndex[a.Key] + if found { + // New attrs overwrite any existing with the same key. + r.dropped++ + if idx < 0 { + r.front[-(idx + 1)] = a + } else { + r.back[idx] = a + } + } else { + // Unique attribute. + // TODO: apply truncation to string and []string values. + // TODO: deduplicate map values. + unique = append(unique, a) + uIndex[a.Key] = len(unique) - 1 + } + } + attrs = unique + + if r.attributeCountLimit > 0 && n+len(attrs) > r.attributeCountLimit { + // Truncate the now unique attributes to comply with limit. + // + // Do not use head(attrs, r.attributeCountLimit - n) here. If + // (r.attributeCountLimit - n) <= 0 attrs needs to be emptied. + last := max(0, (r.attributeCountLimit - n)) + r.dropped += len(attrs) - last + attrs = attrs[:last] + } + + r.addAttrs(attrs) +} + +// attrIndex returns an index map for all attributes in the Record r. The index +// maps the attribute key to location the attribute is stored. If the value is +// < 0 then -(value + 1) (e.g. -1 -> 0, -2 -> 1, -3 -> 2) represents the index +// in r.nFront. Otherwise, the index is the exact index of r.back. +// +// The returned index is taken from the indexPool. It is the callers +// responsibility to return the index to that pool (putIndex) when done. +func (r *Record) attrIndex() map[string]int { + index := getIndex() + for i := 0; i < r.nFront; i++ { + key := r.front[i].Key + index[key] = -i - 1 // stored in front: negative index. + } + for i := 0; i < len(r.back); i++ { + key := r.back[i].Key + index[key] = i // stored in back: positive index. + } + return index +} + +// addAttrs adds attrs to the Record r. This does not validate any limits or +// duplication of attributes, these tasks are left to the caller to handle +// prior to calling. +func (r *Record) addAttrs(attrs []log.KeyValue) { var i int for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ { a := attrs[i] @@ -144,6 +256,14 @@ func (r *Record) AddAttributes(attrs ...log.KeyValue) { // SetAttributes sets (and overrides) attributes to the log record. func (r *Record) SetAttributes(attrs ...log.KeyValue) { + // TODO: apply truncation to string and []string values. + // TODO: deduplicate map values. + attrs, r.dropped = dedup(attrs) + + var drop int + attrs, drop = head(attrs, r.attributeCountLimit) + r.dropped += drop + r.nFront = 0 var i int for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ { @@ -155,11 +275,45 @@ func (r *Record) SetAttributes(attrs ...log.KeyValue) { r.back = slices.Clone(attrs[i:]) } +// head returns the first n values of kvs along with the number of elements +// dropped. If n is less than or equal to zero, kvs is returned with 0. +func head(kvs []log.KeyValue, n int) (out []log.KeyValue, dropped int) { + if n > 0 && len(kvs) > n { + return kvs[:n], len(kvs) - n + } + return kvs, 0 +} + +// dedup deduplicates kvs front-to-back with the last value saved. +func dedup(kvs []log.KeyValue) (unique []log.KeyValue, dropped int) { + index := getIndex() + defer putIndex(index) + + unique = kvs[:0] // Use the same underlying array as kvs. + for _, a := range kvs { + idx, found := index[a.Key] + if found { + dropped++ + unique[idx] = a + } else { + unique = append(unique, a) + index[a.Key] = len(unique) - 1 + } + } + return unique, dropped +} + // AttributesLen returns the number of attributes in the log record. func (r *Record) AttributesLen() int { return r.nFront + len(r.back) } +// DroppedAttributes returns the number of attributes dropped due to limits +// being reached. +func (r *Record) DroppedAttributes() int { + return r.dropped +} + // TraceID returns the trace ID or empty array. func (r *Record) TraceID() trace.TraceID { return r.traceID @@ -206,26 +360,6 @@ func (r *Record) InstrumentationScope() instrumentation.Scope { return *r.scope } -// AttributeValueLengthLimit is the maximum allowed attribute value length. -// -// This limit only applies to string and string slice attribute values. -// Any string longer than this value should be truncated to this length. -// -// Negative value means no limit should be applied. -func (r *Record) AttributeValueLengthLimit() int { - return r.attributeValueLengthLimit -} - -// AttributeCountLimit is the maximum allowed log record attribute count. Any -// attribute added to a log record once this limit is reached should be dropped. -// -// Zero means no attributes should be recorded. -// -// Negative value means no limit should be applied. -func (r *Record) AttributeCountLimit() int { - return r.attributeCountLimit -} - // Clone returns a copy of the record with no shared state. The original record // and the clone can both be modified without interfering with each other. func (r *Record) Clone() Record { diff --git a/sdk/log/record_test.go b/sdk/log/record_test.go index 0ccdfea0787..99adfdfa9e9 100644 --- a/sdk/log/record_test.go +++ b/sdk/log/record_test.go @@ -4,6 +4,7 @@ package log import ( + "strconv" "testing" "time" @@ -125,20 +126,6 @@ func TestRecordInstrumentationScope(t *testing.T) { assert.Equal(t, scope, r.InstrumentationScope()) } -func TestRecordAttributeValueLengthLimit(t *testing.T) { - limit := 12 - r := new(Record) - r.attributeValueLengthLimit = limit - assert.Equal(t, limit, r.AttributeValueLengthLimit()) -} - -func TestRecordAttributeCountLimit(t *testing.T) { - limit := 21 - r := new(Record) - r.attributeCountLimit = limit - assert.Equal(t, limit, r.AttributeCountLimit()) -} - func TestRecordClone(t *testing.T) { now0 := time.Now() sev0 := log.SeverityInfo @@ -204,3 +191,140 @@ func TestRecordClone(t *testing.T) { return assert.Truef(t, kv.Equal(attr1), "%v != %v", kv, attr1) }) } + +func TestRecordDroppedAttributes(t *testing.T) { + for i := 1; i < attributesInlineCount*5; i++ { + r := new(Record) + r.attributeCountLimit = 1 + + attrs := make([]log.KeyValue, i) + attrs[0] = log.Bool("only key different then the rest", true) + r.AddAttributes(attrs...) + assert.Equalf(t, i-1, r.DroppedAttributes(), "%d: AddAttributes", i) + + r.AddAttributes(attrs...) + assert.Equalf(t, 2*i-1, r.DroppedAttributes(), "%d: second AddAttributes", i) + + r.SetAttributes(attrs...) + assert.Equalf(t, i-1, r.DroppedAttributes(), "%d: SetAttributes", i) + } +} + +func TestRecordAttrDeduplication(t *testing.T) { + testcases := []struct { + name string + attrs []log.KeyValue + want []log.KeyValue + }{ + { + name: "EmptyKey", + attrs: make([]log.KeyValue, 10), + want: make([]log.KeyValue, 1), + }, + { + name: "NonEmptyKey", + attrs: []log.KeyValue{ + log.Bool("key", true), + log.Int64("key", 1), + log.Bool("key", false), + log.Float64("key", 2.), + log.String("key", "3"), + log.Slice("key", log.Int64Value(4)), + log.Map("key", log.Int("key", 5)), + log.Bytes("key", []byte("six")), + log.Bool("key", false), + }, + want: []log.KeyValue{ + log.Bool("key", false), + }, + }, + { + name: "Multiple", + attrs: []log.KeyValue{ + log.Bool("a", true), + log.Int64("b", 1), + log.Bool("a", false), + log.Float64("c", 2.), + log.String("b", "3"), + log.Slice("d", log.Int64Value(4)), + log.Map("a", log.Int("key", 5)), + log.Bytes("d", []byte("six")), + log.Bool("e", true), + log.Int("f", 1), + log.Int("f", 2), + log.Int("f", 3), + log.Float64("b", 0.0), + log.Float64("b", 0.0), + log.String("g", "G"), + log.String("h", "H"), + log.String("g", "GG"), + log.Bool("a", false), + }, + want: []log.KeyValue{ + // Order is important here. + log.Bool("a", false), + log.Float64("b", 0.0), + log.Float64("c", 2.), + log.Bytes("d", []byte("six")), + log.Bool("e", true), + log.Int("f", 3), + log.String("g", "GG"), + log.String("h", "H"), + }, + }, + { + name: "NoDuplicate", + attrs: func() []log.KeyValue { + out := make([]log.KeyValue, attributesInlineCount*2) + for i := range out { + out[i] = log.Bool(strconv.Itoa(i), true) + } + return out + }(), + want: func() []log.KeyValue { + out := make([]log.KeyValue, attributesInlineCount*2) + for i := range out { + out[i] = log.Bool(strconv.Itoa(i), true) + } + return out + }(), + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + validate := func(t *testing.T, r *Record) { + t.Helper() + + var i int + r.WalkAttributes(func(kv log.KeyValue) bool { + if assert.Lessf(t, i, len(tc.want), "additional: %v", kv) { + want := tc.want[i] + assert.Truef(t, kv.Equal(want), "%d: want %v, got %v", i, want, kv) + } + i++ + return true + }) + } + + t.Run("SetAttributes", func(t *testing.T) { + r := new(Record) + r.SetAttributes(tc.attrs...) + validate(t, r) + }) + + t.Run("AddAttributes/Empty", func(t *testing.T) { + r := new(Record) + r.AddAttributes(tc.attrs...) + validate(t, r) + }) + + t.Run("AddAttributes/Duplicates", func(t *testing.T) { + r := new(Record) + r.AddAttributes(tc.attrs...) + r.AddAttributes(tc.attrs...) + validate(t, r) + }) + }) + } +}