Skip to content

Commit

Permalink
Allow forceFlush of IntervalMetricReader, globally too for autoconfig…
Browse files Browse the repository at this point in the history
…ure / agent. (#3385)
  • Loading branch information
Anuraag Agrawal authored Jul 9, 2021
1 parent 3111038 commit f13fe79
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ version of the OpenTelemetry schema the files were generated from. There are no
- You can now assign an OpenTelemetry schema URL to a `Meter` via the new `MeterBuilder` class that is
accessed via the `MeterProvider` or any global instances that delegate to one.
- The metrics SDK now utilizes `Attributes` rather than `Labels` internally.
- You can now register an `IntervalMetricReader` as global and `forceFlush` the global reader.

---
## Version 1.3.0 - 2021-06-09
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private static void configureIntervalMetricReader(
if (exportInterval != null) {
readerBuilder.setExportIntervalMillis(exportInterval.toMillis());
}
IntervalMetricReader reader = readerBuilder.buildAndStart();
IntervalMetricReader reader = readerBuilder.build().startAndRegisterGlobal();
Runtime.getRuntime().addShutdownHook(new Thread(reader::shutdown));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.Immutable;
Expand All @@ -30,12 +31,31 @@
public final class IntervalMetricReader {
private static final Logger logger = Logger.getLogger(IntervalMetricReader.class.getName());

private static final AtomicReference<IntervalMetricReader> globalIntervalMetricReader =
new AtomicReference<>();

private final Exporter exporter;
private final ScheduledExecutorService scheduler;

private volatile ScheduledFuture<?> scheduledFuture;
private final Object lock = new Object();

/**
* {@linkplain IntervalMetricReader#forceFlush() Force flushes} the globally registered {@link
* IntervalMetricReader} if available, or does nothing otherwise.
*/
public static CompletableResultCode forceFlushGlobal() {
IntervalMetricReader intervalMetricReader = globalIntervalMetricReader.get();
if (intervalMetricReader != null) {
return intervalMetricReader.forceFlush();
}
return CompletableResultCode.ofSuccess();
}

static void resetGlobalForTest() {
globalIntervalMetricReader.set(null);
}

/** Stops the scheduled task and calls export one more time. */
public CompletableResultCode shutdown() {
final CompletableResultCode result = new CompletableResultCode();
Expand Down Expand Up @@ -75,6 +95,14 @@ public static IntervalMetricReaderBuilder builder() {
return new IntervalMetricReaderBuilder(InternalState.builder());
}

/**
* Requests the {@link IntervalMetricReader} to export current metrics and returns a {@link
* CompletableResultCode} which is completed when the flush is finished.
*/
public CompletableResultCode forceFlush() {
return exporter.doRun();
}

IntervalMetricReader(InternalState internalState) {
this(
internalState,
Expand All @@ -87,11 +115,7 @@ public static IntervalMetricReaderBuilder builder() {
this.scheduler = intervalMetricReader;
}

/**
* Starts this {@link IntervalMetricReader} to report to the configured exporter.
*
* @return this for fluent usage along with the builder.
*/
/** Starts this {@link IntervalMetricReader} to report to the configured exporter. */
public IntervalMetricReader start() {
synchronized (lock) {
if (scheduledFuture != null) {
Expand All @@ -107,6 +131,18 @@ public IntervalMetricReader start() {
}
}

/**
* Starts this {@link IntervalMetricReader} and registers it as the global {@link
* IntervalMetricReader}.
*/
public IntervalMetricReader startAndRegisterGlobal() {
start();
if (!globalIntervalMetricReader.compareAndSet(null, this)) {
logger.log(Level.WARNING, "Global IntervalMetricReader already registered, ignoring.");
}
return this;
}

private static final class Exporter implements Runnable {

private final InternalState internalState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,51 @@ void intervalExport() throws Exception {
}
}

@Test
void forceFlush() throws Exception {
WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter();
IntervalMetricReader intervalMetricReader =
IntervalMetricReader.builder()
// Will force flush.
.setExportIntervalMillis(Long.MAX_VALUE)
.setMetricExporter(waitingMetricExporter)
.setMetricProducers(Collections.singletonList(metricProducer))
.buildAndStart();

assertThat(intervalMetricReader.forceFlush().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();

try {
assertThat(waitingMetricExporter.waitForNumberOfExports(1))
.containsExactly(Collections.singletonList(METRIC_DATA));
} finally {
intervalMetricReader.shutdown();
}
}

@Test
void forceFlushGlobal() throws Exception {
WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter();
IntervalMetricReader intervalMetricReader =
IntervalMetricReader.builder()
// Will force flush.
.setExportIntervalMillis(Long.MAX_VALUE)
.setMetricExporter(waitingMetricExporter)
.setMetricProducers(Collections.singletonList(metricProducer))
.build()
.startAndRegisterGlobal();

assertThat(IntervalMetricReader.forceFlushGlobal().join(10, TimeUnit.SECONDS).isSuccess())
.isTrue();

try {
assertThat(waitingMetricExporter.waitForNumberOfExports(1))
.containsExactly(Collections.singletonList(METRIC_DATA));
} finally {
intervalMetricReader.shutdown();
IntervalMetricReader.resetGlobalForTest();
}
}

@Test
@Timeout(2)
public void intervalExport_exporterThrowsException() throws Exception {
Expand Down

0 comments on commit f13fe79

Please sign in to comment.