Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchSpanProcessor that uses provided ScheduledExecutorService #3036

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions sdk-extensions/executor-service-processor/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
plugins {
`java-library`
`maven-publish`

id("ru.vyarus.animalsniffer")
}

description = "OpenTelemetry SDK Extension: ExecutorService SpanProcessor"
extra["moduleName"] = "io.opentelemetry.sdk.extension.trace.export"

dependencies {
api(project(":api:all"))
api(project(":api:metrics"))
api(project(":sdk:all"))
}
1 change: 1 addition & 0 deletions sdk-extensions/tracing-incubator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extra["moduleName"] = "io.opentelemetry.sdk.extension.trace.incubator"
dependencies {
api(project(":api:all"))
api(project(":sdk:all"))
api(project(":sdk:metrics"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to leave metrics as an implementation detail for now, like we have in the core sdk trace module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


implementation(project(":semconv"))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.incubator.trace;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class ExecutorServiceSpanProcessor implements SpanProcessor {

private static final String SPAN_PROCESSOR_TYPE_LABEL = "spanProcessorType";
private static final String SPAN_PROCESSOR_TYPE_VALUE =
ExecutorServiceSpanProcessor.class.getSimpleName();

private final Worker worker;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final boolean ownsExecutorService;
private final ScheduledExecutorService executorService;
private final ScheduledFuture<?> future;

public static ExecutorServiceSpanProcessorBuilder builder(
SpanExporter spanExporter,
ScheduledExecutorService executorService,
boolean ownsExecutorService) {
return new ExecutorServiceSpanProcessorBuilder(
spanExporter, executorService, ownsExecutorService);
}

ExecutorServiceSpanProcessor(
SpanExporter spanExporter,
long scheduleDelayNanos,
int maxQueueSize,
int maxExportBatchSize,
long exporterTimeoutNanos,
ScheduledExecutorService executorService,
boolean ownsExecutorService,
long workerScheduleInterval) {
this.worker =
new Worker(
spanExporter,
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
new ArrayBlockingQueue<>(maxQueueSize),
SPAN_PROCESSOR_TYPE_LABEL,
SPAN_PROCESSOR_TYPE_VALUE);
this.ownsExecutorService = ownsExecutorService;
this.executorService = executorService;
this.future =
executorService.scheduleWithFixedDelay(
worker, workerScheduleInterval, workerScheduleInterval, TimeUnit.MILLISECONDS);
}

@Override
public void onStart(Context parentContext, ReadWriteSpan span) {}

@Override
public boolean isStartRequired() {
return false;
}

@Override
public void onEnd(ReadableSpan span) {
if (!span.getSpanContext().isSampled()) {
return;
}
worker.addSpan(span);
}

@Override
public boolean isEndRequired() {
return true;
}

@Override
public CompletableResultCode shutdown() {
if (isShutdown.getAndSet(true)) {
return CompletableResultCode.ofSuccess();
}
CompletableResultCode result = worker.shutdown();
// do the cleanup after worker finishes flush
result.whenComplete(
() -> {
future.cancel(false);
if (ownsExecutorService) {
executorService.shutdown();
}
});

return result;
}

@Override
public CompletableResultCode forceFlush() {
return worker.forceFlush();
}

// Visible for testing
List<SpanData> getBatch() {
return new ArrayList<>(worker.getBatch());
}

private static class Worker extends WorkerBase {

private final AtomicLong nextExportTime = new AtomicLong();

private final ArrayBlockingQueue<SpanData> batch;

private Worker(
SpanExporter spanExporter,
long scheduleDelayNanos,
int maxExportBatchSize,
long exporterTimeoutNanos,
BlockingQueue<ReadableSpan> queue,
String spanProcessorTypeLabel,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good reason to have these part of the constructor? Aren't they always the same? I'd rather just have them be constants, probably in the enclosing class.

String spanProcessorTypeValue) {
super(
spanExporter,
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
queue,
spanProcessorTypeLabel,
spanProcessorTypeValue);

this.batch = new ArrayBlockingQueue<>(maxExportBatchSize);
updateNextExportTime();
}

private void updateNextExportTime() {
nextExportTime.set(System.nanoTime() + scheduleDelayNanos);
}

@Override
public void run() {
// nextExportTime is set for the first time in the constructor

continueWork.set(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be an atomic field? It seems that a local boolean variable would work just as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Oberon00 I don't know what would happen if the next run of the worker Runnable will be scheduled on a different thread than the previous one. If it is ok to have a local boolean variable, I will update the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it looks like the boolean is never used outside this method, except in shutdown, and there it seems to change nothing as you already check isShutdown too. But I may have misread the logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isShutdown - should the SpanProcessor shutdown. If true, then no more work should be done.
continueWork - should the current run (loop) continue or not. If false, then at some point in the future the loop will be running again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is: If continueWork.set(false); is done in WorkerBase.shutdown, is isShutdown also already true? If so, then that access to continueWork could be removed. And then we have the situation that at the beginning of run continueWork is always set before it is read and continueWork is never used outside the method. In that case it can be transformed without change of behavior to a local boolean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Oberon00 you are correct. I've converted continueWork to local variable.

while (continueWork.get()) {
if (flushRequested.get() != null) {
flush();
}

try {
ReadableSpan lastElement = queue.peek();
if (lastElement != null) {
batch.add(lastElement.toSpanData());
// drain queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment doesn't seem accurate. You're not draining the queue, you're just removing the element you just peek()ed at, correct?
Also, in the CPU-optimization work that was done for the main BSP, we definitely found a significant overhead in working with an ArrayBlockingQueue this way. Any thoughts to using the same signalling technique and the jctools queue here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to plug in jctools queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all tests pass 👍

queue.take();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than peek() then later a take(), why not just poll() initially and skip the extra step?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want the thread to be blocked when the queue is empty. With jctools I can poll() and skip the extra step.

} else {
continueWork.set(false);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}

if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime.get()) {
exportCurrentBatch();
updateNextExportTime();
}
}
}

@Override
public Collection<SpanData> getBatch() {
return batch;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.incubator.trace;

import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;

import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceSpanProcessorBuilder {

// Visible for testing
static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 5000;
// Visible for testing
static final int DEFAULT_MAX_QUEUE_SIZE = 2048;
// Visible for testing
static final int DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;
// Visible for testing
static final int DEFAULT_EXPORT_TIMEOUT_MILLIS = 30_000;
// Visible for testing
static final int WORKER_SCHEDULE_INTERVAL = 100;

private final SpanExporter spanExporter;
private final boolean ownsExecutorService;
private final ScheduledExecutorService executorService;
private long scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_SCHEDULE_DELAY_MILLIS);
private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE;
private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS);
private long workerScheduleInterval = WORKER_SCHEDULE_INTERVAL;

ExecutorServiceSpanProcessorBuilder(
SpanExporter spanExporter,
ScheduledExecutorService executorService,
boolean ownsExecutorService) {
this.spanExporter = spanExporter;
this.ownsExecutorService = ownsExecutorService;
this.executorService = executorService;
}

/**
* Sets the delay interval between two consecutive exports. If unset, defaults to {@value
* DEFAULT_SCHEDULE_DELAY_MILLIS}ms.
*/
public ExecutorServiceSpanProcessorBuilder setScheduleDelay(long delay, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(delay >= 0, "delay must be non-negative");
scheduleDelayNanos = unit.toNanos(delay);
return this;
}

/**
* Sets the delay interval between two consecutive exports. If unset, defaults to {@value
* DEFAULT_SCHEDULE_DELAY_MILLIS}ms.
*/
public ExecutorServiceSpanProcessorBuilder setScheduleDelay(Duration delay) {
requireNonNull(delay, "delay");
return setScheduleDelay(delay.toNanos(), TimeUnit.NANOSECONDS);
}

// Visible for testing
long getScheduleDelayNanos() {
return scheduleDelayNanos;
}

/**
* Sets the maximum time an export will be allowed to run before being cancelled. If unset,
* defaults to {@value DEFAULT_EXPORT_TIMEOUT_MILLIS}ms.
*/
public ExecutorServiceSpanProcessorBuilder setExporterTimeout(long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
exporterTimeoutNanos = unit.toNanos(timeout);
return this;
}

/**
* Sets the maximum time an export will be allowed to run before being cancelled. If unset,
* defaults to {@value DEFAULT_EXPORT_TIMEOUT_MILLIS}ms.
*/
public ExecutorServiceSpanProcessorBuilder setExporterTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
return setExporterTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}

// Visible for testing
long getExporterTimeoutNanos() {
return exporterTimeoutNanos;
}

/**
* Sets the maximum number of Spans that are kept in the queue before start dropping.
*
* <p>See the BatchSampledSpansProcessor class description for a high-level design description of
* this class.
*
* <p>Default value is {@code 2048}.
*
* @param maxQueueSize the maximum number of Spans that are kept in the queue before start
* dropping.
* @return this.
* @see ExecutorServiceSpanProcessorBuilder#DEFAULT_MAX_QUEUE_SIZE
*/
public ExecutorServiceSpanProcessorBuilder setMaxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
return this;
}

// Visible for testing
int getMaxQueueSize() {
return maxQueueSize;
}

/**
* Sets the maximum batch size for every export. This must be smaller or equal to {@code
* maxQueuedSpans}.
*
* <p>Default value is {@code 512}.
*
* @param maxExportBatchSize the maximum batch size for every export.
* @return this.
* @see ExecutorServiceSpanProcessorBuilder#DEFAULT_MAX_EXPORT_BATCH_SIZE
*/
public ExecutorServiceSpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive.");
this.maxExportBatchSize = maxExportBatchSize;
return this;
}

/**
* Sets the delay interval between two consecutive runs of the worker job. If unset, defaults to
* {@value WORKER_SCHEDULE_INTERVAL}ms.
*/
public ExecutorServiceSpanProcessorBuilder setWorkerScheduleInterval(Duration interval) {
requireNonNull(interval, "interval");
return setWorkerScheduleInterval(interval.toMillis(), TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May as well use nanos

}

/**
* Sets the delay interval between two consecutive runs of the worker job. If unset, defaults to
* {@value WORKER_SCHEDULE_INTERVAL}ms.
*/
public ExecutorServiceSpanProcessorBuilder setWorkerScheduleInterval(
long interval, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(interval >= 0, "interval must be non-negative");
workerScheduleInterval = unit.toMillis(interval);
return this;
}

// Visible for testing
long getWorkerScheduleInterval() {
return workerScheduleInterval;
}

// Visible for testing
int getMaxExportBatchSize() {
return maxExportBatchSize;
}

/**
* Returns a new {@link ExecutorServiceSpanProcessor} that batches, then converts spans to proto
* and forwards them to the given {@code spanExporter}.
*
* @return a new {@link ExecutorServiceSpanProcessor}.
* @throws NullPointerException if the {@code spanExporter} is {@code null}.
*/
public ExecutorServiceSpanProcessor build() {
return new ExecutorServiceSpanProcessor(
spanExporter,
scheduleDelayNanos,
maxQueueSize,
maxExportBatchSize,
exporterTimeoutNanos,
executorService,
ownsExecutorService,
workerScheduleInterval);
}
}
Loading