Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchSpanProcessor that uses provided ScheduledExecutorService #3036

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions sdk-extensions/tracing-incubator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ plugins {
`java-library`
`maven-publish`

id("me.champeau.gradle.jmh")
id("ru.vyarus.animalsniffer")
}

Expand All @@ -14,9 +15,33 @@ dependencies {
api(project(":api:all"))
api(project(":sdk:all"))

implementation(project(":api:metrics"))
implementation(project(":semconv"))

annotationProcessor("com.google.auto.value:auto-value")
testImplementation(project(":sdk:testing"))
testImplementation("com.google.guava:guava-testlib")

jmh(project(":sdk:metrics"))
jmh(project(":sdk:testing")) {
// JMH doesn"t handle dependencies that are duplicated between the main and jmh
// configurations properly, but luckily here it"s simple enough to just exclude transitive
// dependencies.
isTransitive = false
}
jmh(project(":exporters:otlp:trace")) {
// The opentelemetry-exporter-otlp-trace depends on this project itself. So don"t pull in
// the transitive dependencies.
isTransitive = false
}
// explicitly adding the opentelemetry-exporter-otlp dependencies
jmh(project(":exporters:otlp:common")) {
isTransitive = false
}
jmh(project(":proto"))

jmh("com.google.guava:guava")
jmh("io.grpc:grpc-api")
jmh("io.grpc:grpc-netty-shaded")
jmh("org.testcontainers:testcontainers") // testContainer for OTLP collector
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.incubator.trace;

import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collection;
import java.util.OptionalLong;

public class BatchSpanProcessorMetrics {
jkwatson marked this conversation as resolved.
Show resolved Hide resolved

private final Collection<MetricData> allMetrics;
private final int numThreads;

public BatchSpanProcessorMetrics(Collection<MetricData> allMetrics, int numThreads) {
this.allMetrics = allMetrics;
this.numThreads = numThreads;
}

public double dropRatio() {
long exported = getMetric(false);
long dropped = getMetric(true);
long total = exported + dropped;
// Due to peculiarities of JMH reporting we have to divide this by the number of the
// concurrent threads running the actual benchmark.
return total == 0 ? 0 : (double) dropped / total / numThreads;
}

public long exportedSpans() {
return getMetric(false) / numThreads;
}

public long droppedSpans() {
return getMetric(true) / numThreads;
}

private long getMetric(boolean dropped) {
String labelValue = String.valueOf(dropped);
OptionalLong value =
allMetrics.stream()
.filter(metricData -> metricData.getName().equals("processedSpans"))
.filter(metricData -> !metricData.isEmpty())
.map(metricData -> metricData.getLongSumData().getPoints())
.flatMap(Collection::stream)
.filter(point -> labelValue.equals(point.getLabels().get("dropped")))
.mapToLong(LongPointData::getValue)
.findFirst();
return value.isPresent() ? value.getAsLong() : 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.incubator.trace;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class DelayingSpanExporter implements SpanExporter {

private final ScheduledExecutorService executor;

private final int delayMs;

public DelayingSpanExporter(int delayMs) {
executor = Executors.newScheduledThreadPool(5);
this.delayMs = delayMs;
}

@SuppressWarnings("FutureReturnValueIgnored")
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
final CompletableResultCode result = new CompletableResultCode();
executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS);
return result;
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.incubator.trace;

import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

@State(Scope.Benchmark)
public class ExecutorServiceSpanProcessorBenchmark {

@Param({"0", "1", "5"})
private int delayMs;

@Param({"1000", "2000", "5000"})
private int spanCount;

private List<Span> spans;

private ExecutorServiceSpanProcessor processor;

@Setup(Level.Trial)
public final void setup() {
SpanExporter exporter = new DelayingSpanExporter(delayMs);
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
processor = ExecutorServiceSpanProcessor.builder(exporter, executor, true).build();

ImmutableList.Builder<Span> spans = ImmutableList.builderWithExpectedSize(spanCount);
Tracer tracer = SdkTracerProvider.builder().build().get("benchmarkTracer");
for (int i = 0; i < spanCount; i++) {
spans.add(tracer.spanBuilder("span").startSpan());
}
this.spans = spans.build();
}

@TearDown(Level.Trial)
public final void tearDown() {
processor.shutdown().join(10, TimeUnit.SECONDS);
}

/** Export spans through {@link ExecutorServiceSpanProcessor}. */
@Benchmark
@Fork(1)
@Threads(5)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export() {
for (Span span : spans) {
processor.onEnd((ReadableSpan) span);
}
processor.forceFlush().join(10, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.incubator.trace;

import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

/*
* Run this along with a profiler to measure the CPU usage of BatchSpanProcessor's exporter thread.
*/
public class ExecutorServiceSpanProcessorCpuBenchmark {

@State(Scope.Benchmark)
public static class BenchmarkState {
private SdkMeterProvider sdkMeterProvider;
private ExecutorServiceSpanProcessor processor;
private Tracer tracer;
private int numThreads = 1;

@Param({"1"})
private int delayMs;

private long exportedSpans;
private long droppedSpans;

@Setup(Level.Iteration)
public final void setup() {
sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal();
SpanExporter exporter = new DelayingSpanExporter(delayMs);
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
processor = ExecutorServiceSpanProcessor.builder(exporter, executor, true).build();
tracer =
SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer");
}

@TearDown(Level.Iteration)
public final void recordMetrics() {
BatchSpanProcessorMetrics metrics =
new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads);
exportedSpans = metrics.exportedSpans();
droppedSpans = metrics.droppedSpans();
}

@TearDown(Level.Iteration)
public final void tearDown() {
processor.shutdown().join(10, TimeUnit.SECONDS);
}
}

@State(Scope.Thread)
@AuxCounters(AuxCounters.Type.OPERATIONS)
public static class ThreadState {
BenchmarkState benchmarkState;

@TearDown(Level.Iteration)
public final void recordMetrics(BenchmarkState benchmarkState) {
this.benchmarkState = benchmarkState;
}

public long exportedSpans() {
return benchmarkState.exportedSpans;
}

public long droppedSpans() {
return benchmarkState.droppedSpans;
}
}

private static void doWork(BenchmarkState benchmarkState) {
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
// This sleep is essential to maintain a steady state of the benchmark run by generating 10k
// spans per second per thread. Without this JMH outer loop consumes as much CPU as possible
// making comparing different processor versions difficult.
// Note that time spent outside of the sleep is negligible allowing this sleep to control
// span generation rate. Here we get 1 / 100_000 = 10K spans generated per second.
LockSupport.parkNanos(100_000);
}

@Benchmark
@Fork(1)
@Threads(1)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_01Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 1;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(2)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_02Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 2;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(5)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_05Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 5;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(10)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_10Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 10;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(20)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_20Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 20;
doWork(benchmarkState);
}
}
Loading