Skip to content

Commit

Permalink
sdk/log: Processor.OnEmit accetps a Record pointer (#5636)
Browse files Browse the repository at this point in the history
## What

Change `Processor.OnEmit` methods to accept a record pointer.

## Why

Fixes #5219

This would be specification compliant according to discussions around
open-telemetry/opentelemetry-specification#4067

This is inline of how processors Go span processors works and how log
processors work in other languages.

If the performance (an additional heap allocation during log processing)
would occur to be a significant problem for some users, we have at few
possible solutions:

1. Utilize PGO which may also lead to decreasing heap allocations
(sources:
https://landontclipp.github.io/blog/2023/08/25/profile-guided-optimizations-in-go/#devirtualization,
https://andrewwphillips.github.io/blog/pgo.html). Currently it does not
but I expect it may change in future.
2. Improve the Go compilers escape analysis (related to previous point)
3. introduce new "chaining processor" which can be also desirable in
other languages

## Benchstat

`old` is from `main`.
`new` is from current branch.

`new-pgo` is from current branch with PGO optimization. I first run
benchmarks to generate a CPU profile using `go test -run=^$ -bench=.
-count=10 -cpuprofile default.pgo` and then I rerun the tests with PGO.
Currently, the number of heap allocations is the same.

```
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/sdk/log
cpu: Intel(R) Core(TM) i9-10885H CPU @ 2.40GHz
                                  │    old.txt    │                new.txt                │              new-pgo.txt              │
                                  │    sec/op     │    sec/op     vs base                 │    sec/op     vs base                 │
BatchProcessorOnEmit-16              402.7n ± 18%   382.0n ±  7%         ~ (p=0.247 n=10)   376.7n ± 14%         ~ (p=0.210 n=10)
Processor/Simple-16                  350.9n ±  9%   782.5n ±  6%  +123.00% (p=0.000 n=10)   755.1n ±  5%  +115.19% (p=0.000 n=10)
Processor/Batch-16                   1.333µ ± 15%   1.497µ ± 11%   +12.27% (p=0.000 n=10)   1.528µ ±  8%   +14.63% (p=0.000 n=10)
Processor/SetTimestampSimple-16      329.5n ± 15%   711.6n ±  4%  +115.93% (p=0.000 n=10)   721.9n ±  5%  +119.04% (p=0.000 n=10)
Processor/SetTimestampBatch-16       1.163µ ±  2%   1.524µ ±  3%   +31.03% (p=0.000 n=10)   1.461µ ±  5%   +25.57% (p=0.000 n=10)
Processor/AddAttributesSimple-16     408.7n ±  3%   810.1n ±  4%   +98.21% (p=0.000 n=10)   830.1n ±  4%  +103.11% (p=0.000 n=10)
Processor/AddAttributesBatch-16      1.270µ ±  2%   1.623µ ±  4%   +27.71% (p=0.000 n=10)   1.597µ ±  7%   +25.66% (p=0.000 n=10)
Processor/SetAttributesSimple-16     436.2n ± 10%   796.1n ±  3%   +82.50% (p=0.000 n=10)   817.6n ±  4%   +87.43% (p=0.000 n=10)
Processor/SetAttributesBatch-16      1.202µ ±  2%   1.552µ ±  2%   +29.06% (p=0.000 n=10)   1.659µ ± 11%   +37.96% (p=0.000 n=10)
LoggerNewRecord/5_attributes-16      366.6n ±  3%   363.7n ±  7%         ~ (p=0.952 n=10)   426.2n ±  7%   +16.27% (p=0.000 n=10)
LoggerNewRecord/10_attributes-16     1.711µ ±  2%   1.909µ ± 18%   +11.57% (p=0.000 n=10)   2.077µ ± 10%   +21.39% (p=0.000 n=10)
LoggerProviderLogger-16              650.1n ±  4%   690.1n ±  8%    +6.15% (p=0.019 n=10)   737.6n ± 13%   +13.47% (p=0.004 n=10)
WalkAttributes/1_attributes-16       5.264n ± 12%   5.510n ±  8%         ~ (p=0.812 n=10)   5.865n ±  5%   +11.41% (p=0.011 n=10)
WalkAttributes/10_attributes-16      5.440n ±  8%   5.881n ±  7%    +8.12% (p=0.004 n=10)   6.104n ±  7%   +12.21% (p=0.005 n=10)
WalkAttributes/100_attributes-16     5.403n ±  9%   5.894n ±  9%    +9.10% (p=0.029 n=10)   5.783n ±  6%         ~ (p=0.052 n=10)
WalkAttributes/1000_attributes-16    5.196n ±  4%   5.860n ±  8%   +12.79% (p=0.000 n=10)   5.981n ± 13%   +15.13% (p=0.002 n=10)
SetAddAttributes/SetAttributes-16    181.2n ± 14%   208.1n ± 12%   +14.85% (p=0.005 n=10)   209.9n ± 11%   +15.87% (p=0.007 n=10)
SetAddAttributes/AddAttributes-16    156.7n ± 14%   161.1n ± 16%         ~ (p=0.190 n=10)   165.5n ± 15%         ~ (p=0.315 n=10)
SimpleProcessorOnEmit-16            11.775n ± 10%   9.027n ± 17%   -23.33% (p=0.000 n=10)   9.389n ± 18%   -20.26% (p=0.002 n=10)
geomean                              169.1n         209.6n         +23.98%                  215.5n         +27.48%

                        │     old.txt     │               new.txt                │              new-pgo.txt              │
                        │       B/s       │     B/s       vs base                │      B/s       vs base                │
BatchProcessorOnEmit-16   1004.39Mi ± 15%   79.88Mi ± 7%  -92.05% (p=0.000 n=10)   81.06Mi ± 12%  -91.93% (p=0.000 n=10)

                                  │   old.txt    │                new.txt                 │               new-pgo.txt               │
                                  │     B/op     │    B/op      vs base                   │     B/op      vs base                   │
BatchProcessorOnEmit-16             0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
Processor/Simple-16                   0.0 ± 0%      417.0 ± 0%         ? (p=0.000 n=10)      417.0 ±  0%         ? (p=0.000 n=10)
Processor/Batch-16                  621.5 ± 2%     1057.5 ± 1%   +70.15% (p=0.000 n=10)     1064.5 ±  1%   +71.28% (p=0.000 n=10)
Processor/SetTimestampSimple-16       0.0 ± 0%      417.0 ± 0%         ? (p=0.000 n=10)      418.0 ±  0%         ? (p=0.000 n=10)
Processor/SetTimestampBatch-16      626.5 ± 3%     1049.5 ± 1%   +67.52% (p=0.000 n=10)     1057.5 ±  2%   +68.79% (p=0.000 n=10)
Processor/AddAttributesSimple-16      0.0 ± 0%      417.0 ± 0%         ? (p=0.000 n=10)      417.0 ±  0%         ? (p=0.000 n=10)
Processor/AddAttributesBatch-16     616.5 ± 3%     1053.0 ± 2%   +70.80% (p=0.000 n=10)     1048.5 ±  2%   +70.07% (p=0.000 n=10)
Processor/SetAttributesSimple-16    48.00 ± 0%     466.00 ± 0%  +870.83% (p=0.000 n=10)     466.00 ±  0%  +870.83% (p=0.000 n=10)
Processor/SetAttributesBatch-16     648.0 ± 3%     1089.5 ± 1%   +68.13% (p=0.000 n=10)     1087.5 ±  4%   +67.82% (p=0.000 n=10)
LoggerNewRecord/5_attributes-16     0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
LoggerNewRecord/10_attributes-16    610.0 ± 0%      610.0 ± 0%         ~ (p=1.000 n=10) ¹    610.0 ±  0%         ~ (p=1.000 n=10) ¹
LoggerProviderLogger-16             354.5 ± 6%      368.0 ± 7%         ~ (p=0.288 n=10)      391.0 ± 29%         ~ (p=0.239 n=10)
WalkAttributes/1_attributes-16      0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
WalkAttributes/10_attributes-16     0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
WalkAttributes/100_attributes-16    0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
WalkAttributes/1000_attributes-16   0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/SetAttributes-16   48.00 ± 0%      48.00 ± 0%         ~ (p=1.000 n=10) ¹    48.00 ±  0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/AddAttributes-16   0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
SimpleProcessorOnEmit-16            0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
geomean                                        ²                ?                       ²                 ?                       ²
¹ all samples are equal
² summaries must be >0 to compute geomean

                                  │   old.txt    │                new.txt                │              new-pgo.txt              │
                                  │  allocs/op   │ allocs/op   vs base                   │ allocs/op   vs base                   │
BatchProcessorOnEmit-16             0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
Processor/Simple-16                 0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/Batch-16                  0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/SetTimestampSimple-16     0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/SetTimestampBatch-16      0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/AddAttributesSimple-16    0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/AddAttributesBatch-16     0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/SetAttributesSimple-16    1.000 ± 0%     2.000 ± 0%  +100.00% (p=0.000 n=10)     2.000 ± 0%  +100.00% (p=0.000 n=10)
Processor/SetAttributesBatch-16     1.000 ± 0%     2.000 ± 0%  +100.00% (p=0.000 n=10)     2.000 ± 0%  +100.00% (p=0.000 n=10)
LoggerNewRecord/5_attributes-16     0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
LoggerNewRecord/10_attributes-16    4.000 ± 0%     4.000 ± 0%         ~ (p=1.000 n=10) ¹   4.000 ± 0%         ~ (p=1.000 n=10) ¹
LoggerProviderLogger-16             1.000 ± 0%     1.000 ± 0%         ~ (p=1.000 n=10) ¹   1.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/1_attributes-16      0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/10_attributes-16     0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/100_attributes-16    0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/1000_attributes-16   0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/SetAttributes-16   1.000 ± 0%     1.000 ± 0%         ~ (p=1.000 n=10) ¹   1.000 ± 0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/AddAttributes-16   0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
SimpleProcessorOnEmit-16            0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
geomean                                        ²               ?                       ²               ?                       ²
¹ all samples are equal
² summaries must be >0 to compute geomean
```
  • Loading branch information
pellared authored Aug 1, 2024
1 parent 2726c6a commit 8e8ad09
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 129 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
See our [versioning policy](VERSIONING.md) for more information about these stability guarantees. (#5629)
- Add `InstrumentationScope` field to `SpanStub` in `go.opentelemetry.io/otel/sdk/trace/tracetest`, as a replacement for the deprecated `InstrumentationLibrary`. (#5627)

### Changed

- `Processor.OnEmit` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636)

### Fixed

- Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#5584)
Expand Down
24 changes: 24 additions & 0 deletions sdk/log/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,31 @@ provided via API.
Moreover it is safer to have these abstraction decoupled.
E.g. there can be a need for some fields that can be set via API and cannot be modified by the processors.

### Processor.OnEmit to accept Record values

There was a proposal to make the [Processor](#processor)'s `OnEmit`
to accept a [Record](#record) value instead of a pointer to reduce allocations
as well as to have design similar to [`slog.Handler`](https://pkg.go.dev/log/slog#Handler).

There have been long discussions within the OpenTelemetry Specification SIG[^5]
about whether such a design would comply with the specification. The summary
was that the current processor design flaws are present in other languages as
well. Therefore, it would be favorable to introduce new processing concepts
(e.g. chaining processors) in the specification that would coexist with the
current "mutable" processor design.

The performance disadvantages caused by using a pointer (which at the time of
writing causes an additional heap allocation) may be mitigated by future
versions of the Go compiler, thanks to improved escape analysis and
profile-guided optimization (PGO)[^6].

On the other hand, [Processor](#processor)'s `Enabled` is fine to accept
a [Record](#record) value as the processors should not mutate the passed
parameters.

[^1]: [A Guide to the Go Garbage Collector](https://tip.golang.org/doc/gc-guide)
[^2]: [OpenTelemetry Logging](https://opentelemetry.io/docs/specs/otel/logs)
[^3]: [Conversation on representing LogRecordProcessor and LogRecordExporter via a single Expoter interface](https://github.com/open-telemetry/opentelemetry-go/pull/4954#discussion_r1515050480)
[^4]: [Introduce Processor](https://github.com/pellared/opentelemetry-go/pull/9)
[^5]: [Log record mutations do not have to be visible in next registered processors](https://github.com/open-telemetry/opentelemetry-specification/pull/4067)
[^6]: [Profile-guided optimization](https://go.dev/doc/pgo)
6 changes: 4 additions & 2 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,13 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
}

// OnEmit batches provided log record.
func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
if b.stopped.Load() || b.q == nil {
return nil
}
if n := b.q.Enqueue(r); n >= b.batchSize {
// The record is cloned so that changes done by subsequent processors
// are not going to lead to a data race.
if n := b.q.Enqueue(r.Clone()); n >= b.batchSize {
select {
case b.pollTrigger <- struct{}{}:
default:
Expand Down
36 changes: 18 additions & 18 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func TestEmptyBatchConfig(t *testing.T) {
assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
var record Record
record := new(Record)
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, record), "Enabled")
assert.False(t, bp.Enabled(ctx, *record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
})
Expand Down Expand Up @@ -197,8 +197,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Nanosecond),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, size) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < size; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
var got []Record
assert.Eventually(t, func() bool {
Expand All @@ -220,8 +220,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 10*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < 10*batch; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 1
Expand All @@ -243,8 +243,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 2*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < 2*batch; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}

var n int
Expand All @@ -255,7 +255,7 @@ func TestBatchProcessor(t *testing.T) {

var err error
require.Eventually(t, func() bool {
err = b.OnEmit(ctx, Record{})
err = b.OnEmit(ctx, new(Record))
return true
}, time.Second, time.Microsecond, "OnEmit blocked")
assert.NoError(t, err)
Expand Down Expand Up @@ -303,15 +303,15 @@ func TestBatchProcessor(t *testing.T) {
assert.NoError(t, b.Shutdown(ctx))

want := e.ExportN()
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
assert.Equal(t, want, e.ExportN(), "Export called after shutdown")
})

t.Run("ForceFlush", func(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchProcessor(e)

assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
assert.NoError(t, b.Shutdown(ctx))

assert.NoError(t, b.ForceFlush(ctx))
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestBatchProcessor(t *testing.T) {
)
t.Cleanup(func() { _ = b.Shutdown(ctx) })

var r Record
r := new(Record)
r.SetBody(log.BoolValue(true))
require.NoError(t, b.OnEmit(ctx, r))

Expand All @@ -353,7 +353,7 @@ func TestBatchProcessor(t *testing.T) {
if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") {
got := e.Records()
if assert.Len(t, got[0], 1, "records received") {
assert.Equal(t, r, got[0][0])
assert.Equal(t, *r, got[0][0])
}
}
})
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestBatchProcessor(t *testing.T) {

// Enqueue 10 x "batch size" amount of records.
for i := 0; i < 10*batch; i++ {
require.NoError(t, b.OnEmit(ctx, Record{}))
require.NoError(t, b.OnEmit(ctx, new(Record)))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input)
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestBatchProcessor(t *testing.T) {
b := NewBatchProcessor(e)
t.Cleanup(func() { _ = b.Shutdown(ctx) })

var r Record
r := new(Record)
r.SetBody(log.BoolValue(true))
_ = b.OnEmit(ctx, r)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
var r Record
r := new(Record)
// First record will be blocked by testExporter.Export
assert.NoError(t, b.OnEmit(ctx, r), "exported record")
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -497,7 +497,7 @@ func TestBatchProcessor(t *testing.T) {
case <-ctx.Done():
return
default:
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
// Ignore partial flush errors.
_ = b.ForceFlush(ctx)
}
Expand Down Expand Up @@ -642,7 +642,7 @@ func TestQueue(t *testing.T) {
}

func BenchmarkBatchProcessorOnEmit(b *testing.B) {
var r Record
r := new(Record)
body := log.BoolValue(true)
r.SetBody(body)

Expand Down
113 changes: 79 additions & 34 deletions sdk/log/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,77 @@ import (
func BenchmarkProcessor(b *testing.B) {
for _, tc := range []struct {
name string
f func() Processor
f func() []LoggerProviderOption
}{
{
name: "Simple",
f: func() Processor {
return NewSimpleProcessor(noopExporter{})
f: func() []LoggerProviderOption {
return []LoggerProviderOption{WithProcessor(NewSimpleProcessor(noopExporter{}))}
},
},
{
name: "Batch",
f: func() Processor {
return NewBatchProcessor(noopExporter{})
f: func() []LoggerProviderOption {
return []LoggerProviderOption{WithProcessor(NewBatchProcessor(noopExporter{}))}
},
},
{
name: "SetTimestampSimple",
f: func() Processor {
return timestampDecorator{NewSimpleProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(timestampProcessor{}),
WithProcessor(NewSimpleProcessor(noopExporter{})),
}
},
},
{
name: "SetTimestampBatch",
f: func() Processor {
return timestampDecorator{NewBatchProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(timestampProcessor{}),
WithProcessor(NewBatchProcessor(noopExporter{})),
}
},
},
{
name: "AddAttributesSimple",
f: func() Processor {
return attrAddDecorator{NewSimpleProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrAddProcessor{}),
WithProcessor(NewSimpleProcessor(noopExporter{})),
}
},
},
{
name: "AddAttributesBatch",
f: func() Processor {
return attrAddDecorator{NewBatchProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrAddProcessor{}),
WithProcessor(NewBatchProcessor(noopExporter{})),
}
},
},
{
name: "SetAttributesSimple",
f: func() Processor {
return attrSetDecorator{NewSimpleProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrSetDecorator{}),
WithProcessor(NewSimpleProcessor(noopExporter{})),
}
},
},
{
name: "SetAttributesBatch",
f: func() Processor {
return attrSetDecorator{NewBatchProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrSetDecorator{}),
WithProcessor(NewBatchProcessor(noopExporter{})),
}
},
},
} {
b.Run(tc.name, func(b *testing.B) {
provider := NewLoggerProvider(WithProcessor(tc.f()))
provider := NewLoggerProvider(tc.f()...)
b.Cleanup(func() { assert.NoError(b, provider.Shutdown(context.Background())) })
logger := provider.Logger(b.Name())

Expand All @@ -91,32 +109,59 @@ func BenchmarkProcessor(b *testing.B) {
}
}

type timestampDecorator struct {
Processor
}
type timestampProcessor struct{}

func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
func (p timestampProcessor) OnEmit(ctx context.Context, r *Record) error {
r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC))
return e.Processor.OnEmit(ctx, r)
return nil
}

func (p timestampProcessor) Enabled(context.Context, Record) bool {
return true
}

type attrAddDecorator struct {
Processor
func (p timestampProcessor) Shutdown(ctx context.Context) error {
return nil
}

func (e attrAddDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
func (p timestampProcessor) ForceFlush(ctx context.Context) error {
return nil
}

type attrAddProcessor struct{}

func (p attrAddProcessor) OnEmit(ctx context.Context, r *Record) error {
r.AddAttributes(log.String("add", "me"))
return e.Processor.OnEmit(ctx, r)
return nil
}

type attrSetDecorator struct {
Processor
func (p attrAddProcessor) Enabled(context.Context, Record) bool {
return true
}

func (e attrSetDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
func (p attrAddProcessor) Shutdown(ctx context.Context) error {
return nil
}

func (p attrAddProcessor) ForceFlush(ctx context.Context) error {
return nil
}

type attrSetDecorator struct{}

func (p attrSetDecorator) OnEmit(ctx context.Context, r *Record) error {
r.SetAttributes(log.String("replace", "me"))
return e.Processor.OnEmit(ctx, r)
return nil
}

func (p attrSetDecorator) Enabled(context.Context, Record) bool {
return true
}

func (p attrSetDecorator) Shutdown(ctx context.Context) error {
return nil
}

func (p attrSetDecorator) ForceFlush(ctx context.Context) error {
return nil
}
Loading

0 comments on commit 8e8ad09

Please sign in to comment.