-
Notifications
You must be signed in to change notification settings - Fork 848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BatchSpanProcessor that uses provided ScheduledExecutorService #3036
BatchSpanProcessor that uses provided ScheduledExecutorService #3036
Conversation
Thank you for this PR! Please fill in the PR description, e.g. "Fixes #2980" if that is the case. |
this.ownsExecutorService = ownsExecutorService; | ||
this.executorService = executorService; | ||
this.future = | ||
executorService.scheduleAtFixedRate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to use scheduleWithFixedDelay to avoid getting in a busy loop if export takes too long. That may mean dropped spans, but this is IMHO usually preferable to taking too much CPU time away.
Since we're getting so many different variant PRs for updates to the BSP (I think we're up to 4!), it might make sense to put them all into an incubator module so people have options and can try them all out to see what works best. |
sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/WorkerBase.java
Outdated
Show resolved
Hide resolved
@jkwatson I have the question about the incubator module. Is something I should do in this PR? |
If it's a general question about it, a Discussion might be better so it's more visible. |
@@ -13,6 +13,7 @@ extra["moduleName"] = "io.opentelemetry.sdk.extension.trace.incubator" | |||
dependencies { | |||
api(project(":api:all")) | |||
api(project(":sdk:all")) | |||
api(project(":sdk:metrics")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to leave metrics
as an implementation detail for now, like we have in the core sdk trace module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I'm interested to see how the various benchmarks run against this variant. Would you mind also copying over the benchmarks for the BatchSpanProcessor as well? And, if you could run them against this variant and compare to running them against the BSP on |
@jkwatson sure, I will run benchmarks and post results here. |
…y-java into executor-service-batch-span-processor
@jkwatson I've tried to run
I have JDK 15 installed on my machine. |
@jkwatson It turns out that one needs JDK 11 to run benchmarks. I've run them and it turns out that the performance of this variant is a little bit worse than the original |
@jkwatson I've merged |
@anuraaga could take a look at this PR? |
*/ | ||
public ExecutorServiceSpanProcessorBuilder setWorkerScheduleInterval(Duration interval) { | ||
requireNonNull(interval, "interval"); | ||
return setWorkerScheduleInterval(interval.toMillis(), TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May as well use nanos
|
||
try { | ||
final CompletableResultCode result = spanExporter.export(new ArrayList<>(batch)); | ||
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that it took reading code to come up with this basic question - is there a use case for sharing an ExecutorService
if we're doing blocking I/O? For example, if we try to share a single thread between span processor and interval metric reader, than while we're waiting for spans to be exported, this thread is asleep doing nothing while metrics can't be processed. Or if we use two thread executor, then we may as well have used separate single-thread executors instead of opening up a can of worms for blocking I/O to affect other processes. @Oberon00 what do you think?
We had an asynchronous BSP at one point and for such a processor, sharing an executor makes a lot of sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while we're waiting for spans to be exported, this thread is asleep doing nothing while metrics can't be processed
The current implementation of the IntervalMetricReader does metric collection and export on the same thread, so this can really be a problem. But if it did metric export and collection on separate threads, it could share only the metric exporting thread, not the collection thread.
We had an asynchronous BSP at one point
I think the join
will need to be removed and the BSP needs to use the whenComplete
mechanism for this implementation to become more robust (to maintain the timeout, maybe additionally use schedule
to signal the whenComplete callback to cancel the whenComplete).
This will also mean that we can no longer use scheduleWithFixedDelay
but need to schedule()
the next export ourselves whenever the last whenComplete
is done.
@anuraaga Does that sound reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it could share only the metric exporting thread, not the collection thread.
I guess even with this, if we're not doing the I/O simultaneously it could cause issues with the export throughput. Pretty sure that's what you're thinking too just confirming :)
This will also mean that we can no longer use scheduleWithFixedDelay but need to schedule() the next export ourselves whenever the last whenComplete is done.
Yeah this seems ok to me - unless I'm missing something I think it's a prerequisite for having exporters share threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it could cause issues with the export throughput
I'm not sure how serious this is. Even if metrics and spans have to wait for each other, it depends on the intervals if this is a problem. In the worst case where the thread is always "busy" (at least waiting for I/O to complete) and metrics and spans cause about the same traffic, even when using two threads (one for metrics, one for spans) you get at most 2x improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…executor-service-batch-span-processor
…y-java into executor-service-batch-span-processor
FYI on the benchmarks...the CPU-based benchmarks should be unchanged, and this is intentional, as they really aren't supposed to be useful without a profiler hooked up. |
...r/src/jmh/java/io/opentelemetry/sdk/extension/incubator/trace/BatchSpanProcessorMetrics.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems reasonable to me, thanks!
public void run() { | ||
// nextExportTime is set for the first time in the constructor | ||
|
||
continueWork.set(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be an atomic field? It seems that a local boolean variable would work just as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Oberon00 I don't know what would happen if the next run of the worker Runnable
will be scheduled on a different thread than the previous one. If it is ok to have a local boolean variable, I will update the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it looks like the boolean is never used outside this method, except in shutdown, and there it seems to change nothing as you already check isShutdown too. But I may have misread the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isShutdown
- should the SpanProcessor shutdown. If true, then no more work should be done.
continueWork
- should the current run (loop) continue or not. If false, then at some point in the future the loop will be running again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question is: If continueWork.set(false);
is done in WorkerBase.shutdown, is isShutdown
also already true? If so, then that access to continueWork could be removed. And then we have the situation that at the beginning of run
continueWork is always set before it is read and continueWork is never used outside the method. In that case it can be transformed without change of behavior to a local boolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Oberon00 you are correct. I've converted continueWork
to local variable.
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
|
||
abstract class WorkerBase implements Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably ok since this is just in the incubator, but this class is confusing to me. It's unclear how one would use it in general to implement a Batching SpanProcessor. The responsibilities between the abstract class and a concrete implementation are not particularly well defined, as far as I can tell.
I'd also prefer it if we can figure out a solution that doesn't involve inheritance. Can the functionality in this class be something that is either a) usable directly, without extending it or b) this class is concrete and takes an implementation of some strategy interface in order to do its work?
At the absolute minimum, this class would need detailed documentation on how to use it properly, as I suspect that there are many gotchas that have to be done just right for it to work correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class can reduce the amount of copied code from BatchSpanProcessor
worker.
Let me think about other solutions you've proposed above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jkwatson I've removed WorkerBase
class. I've introduced WorkerExporter
which contains all methods useful for exporting a batch of spans.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that WorkerExported
might be useful compared to WorkerBase
.
.setUpdater( | ||
result -> | ||
result.observe( | ||
queue.size(), Labels.of(spanProcessorTypeLabel, spanProcessorTypeValue))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might as well make these labels statically allocated, so we don't have to recreate them every collection cycle.
int maxExportBatchSize, | ||
long exporterTimeoutNanos, | ||
BlockingQueue<ReadableSpan> queue, | ||
String spanProcessorTypeLabel, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a good reason to have these part of the constructor? Aren't they always the same? I'd rather just have them be constants, probably in the enclosing class.
if (lastElement != null) { | ||
batch.add(lastElement.toSpanData()); | ||
// drain queue | ||
queue.take(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than peek()
then later a take()
, why not just poll()
initially and skip the extra step?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want the thread to be blocked when the queue is empty. With jctools
I can poll()
and skip the extra step.
ReadableSpan lastElement = queue.peek(); | ||
if (lastElement != null) { | ||
batch.add(lastElement.toSpanData()); | ||
// drain queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment doesn't seem accurate. You're not draining the queue, you're just removing the element you just peek()
ed at, correct?
Also, in the CPU-optimization work that was done for the main BSP, we definitely found a significant overhead in working with an ArrayBlockingQueue this way. Any thoughts to using the same signalling technique and the jctools queue here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to plug in jctools queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all tests pass 👍
private final AtomicReference<CompletableResultCode> flushRequested; | ||
private final int maxExportBatchSize; | ||
|
||
WorkerExporter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems weird to have a public class with all public methods with a non-public constructor. Does this class need to be public right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, it can be in package scope
ScheduledExecutorService executorService, | ||
Logger logger, | ||
long exporterTimeoutNanos, | ||
BoundLongCounter exportedSpans, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to "exportedSpanCounter"
private final Logger logger; | ||
private final long exporterTimeoutNanos; | ||
private final BoundLongCounter exportedSpans; | ||
private final AtomicReference<CompletableResultCode> flushRequested; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract on what this is for and how it's used needs to be very carefully documented if we're going to have this class be public. I might start by renaming it to something like "flushSignal" or something that makes it clear that it's a 2-way communication pathway between the span processor and this worker.
@jkwatson thank you for code review. I will address your comments as soon as I have time at work. |
@jkwatson I've addressed all comments. Could you re-review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's give it a try in the incubator!
Fixes #2980