Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchSpanProcessor that uses provided ScheduledExecutorService #3036

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

1 change: 1 addition & 0 deletions sdk-extensions/tracing-incubator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extra["moduleName"] = "io.opentelemetry.sdk.extension.trace.incubator"
dependencies {
api(project(":api:all"))
api(project(":sdk:all"))
api(project(":sdk:metrics"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to leave metrics as an implementation detail for now, like we have in the core sdk trace module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


implementation(project(":semconv"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.trace.export;
package io.opentelemetry.sdk.extension.incubator.trace;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
Expand All @@ -12,14 +12,16 @@
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.export.Worker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class ExecutorServiceSpanProcessor implements SpanProcessor {

Expand Down Expand Up @@ -114,4 +116,72 @@ public CompletableResultCode forceFlush() {
List<SpanData> getBatch() {
return new ArrayList<>(worker.getBatch());
}

private static class Worker extends WorkerBase {

private final AtomicLong nextExportTime = new AtomicLong();

private final ArrayBlockingQueue<SpanData> batch;

private Worker(
SpanExporter spanExporter,
long scheduleDelayNanos,
int maxExportBatchSize,
long exporterTimeoutNanos,
BlockingQueue<ReadableSpan> queue,
String spanProcessorTypeLabel,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good reason to have these part of the constructor? Aren't they always the same? I'd rather just have them be constants, probably in the enclosing class.

String spanProcessorTypeValue) {
super(
spanExporter,
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
queue,
spanProcessorTypeLabel,
spanProcessorTypeValue);

this.batch = new ArrayBlockingQueue<>(maxExportBatchSize);
updateNextExportTime();
}

private void updateNextExportTime() {
nextExportTime.set(System.nanoTime() + scheduleDelayNanos);
}

@Override
public void run() {
// nextExportTime is set for the first time in the constructor

continueWork.set(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be an atomic field? It seems that a local boolean variable would work just as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Oberon00 I don't know what would happen if the next run of the worker Runnable will be scheduled on a different thread than the previous one. If it is ok to have a local boolean variable, I will update the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it looks like the boolean is never used outside this method, except in shutdown, and there it seems to change nothing as you already check isShutdown too. But I may have misread the logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isShutdown - should the SpanProcessor shutdown. If true, then no more work should be done.
continueWork - should the current run (loop) continue or not. If false, then at some point in the future the loop will be running again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is: If continueWork.set(false); is done in WorkerBase.shutdown, is isShutdown also already true? If so, then that access to continueWork could be removed. And then we have the situation that at the beginning of run continueWork is always set before it is read and continueWork is never used outside the method. In that case it can be transformed without change of behavior to a local boolean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Oberon00 you are correct. I've converted continueWork to local variable.

while (continueWork.get()) {
if (flushRequested.get() != null) {
flush();
}

try {
ReadableSpan lastElement = queue.peek();
if (lastElement != null) {
batch.add(lastElement.toSpanData());
// drain queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment doesn't seem accurate. You're not draining the queue, you're just removing the element you just peek()ed at, correct?
Also, in the CPU-optimization work that was done for the main BSP, we definitely found a significant overhead in working with an ArrayBlockingQueue this way. Any thoughts to using the same signalling technique and the jctools queue here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to plug in jctools queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all tests pass 👍

queue.take();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than peek() then later a take(), why not just poll() initially and skip the extra step?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want the thread to be blocked when the queue is empty. With jctools I can poll() and skip the extra step.

} else {
continueWork.set(false);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}

if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime.get()) {
exportCurrentBatch();
updateNextExportTime();
}
}
}

@Override
public Collection<SpanData> getBatch() {
return batch;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.trace.export;
package io.opentelemetry.sdk.extension.incubator.trace;

import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.trace.export;
package io.opentelemetry.sdk.extension.incubator.trace;

import io.opentelemetry.api.metrics.BoundLongCounter;
import io.opentelemetry.api.metrics.GlobalMetricsProvider;
Expand All @@ -13,6 +13,7 @@
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.trace.export;
package io.opentelemetry.sdk.extension.incubator.trace;

import io.opentelemetry.api.internal.GuardedBy;
import io.opentelemetry.sdk.common.CompletableResultCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.trace.export;
package io.opentelemetry.sdk.extension.incubator.trace;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.trace.export;
package io.opentelemetry.sdk.extension.incubator.trace;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.trace.export;
package io.opentelemetry.sdk.extension.incubator.trace;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
Expand Down
Loading