Skip to content

Commit

Permalink
chore: add plumbing to allow customer to enable reporting OT metrics (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
agrawal-siddharth authored Jul 26, 2024
1 parent 4ae1ee1 commit 9839d4d
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class ConnectionWorker implements AutoCloseable {

private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/";
private static Pattern streamPatternTable = Pattern.compile(tableMatching);
private final boolean enableOpenTelemetry;
private Meter writeMeter;
static AttributeKey<String> telemetryKeyTableId = AttributeKey.stringKey("table_id");
static AttributeKey<String> telemetryKeyWriterId = AttributeKey.stringKey("writer_id");
Expand Down Expand Up @@ -480,7 +481,8 @@ public ConnectionWorker(
@Nullable String compressorName,
BigQueryWriteSettings clientSettings,
RetrySettings retrySettings,
boolean enableRequestProfiler)
boolean enableRequestProfiler,
boolean enableOpenTelemetry)
throws IOException {
this.lock = new ReentrantLock();
this.hasMessageInWaitingQueue = lock.newCondition();
Expand All @@ -505,6 +507,7 @@ public ConnectionWorker(
this.retrySettings = retrySettings;
this.telemetryAttributes = buildOpenTelemetryAttributes();
this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler);
this.enableOpenTelemetry = enableOpenTelemetry;
registerOpenTelemetryMetrics();

// Always recreate a client for connection worker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public abstract static class Builder {
private static Settings settings = Settings.builder().build();

private final boolean enableRequestProfiler;
private final boolean enableOpenTelemetry;

ConnectionWorkerPool(
long maxInflightRequests,
Expand All @@ -216,7 +217,8 @@ public abstract static class Builder {
@Nullable String comperssorName,
BigQueryWriteSettings clientSettings,
RetrySettings retrySettings,
boolean enableRequestProfiler) {
boolean enableRequestProfiler,
boolean enableOpenTelemetry) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
Expand All @@ -227,6 +229,7 @@ public abstract static class Builder {
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
this.retrySettings = retrySettings;
this.enableRequestProfiler = enableRequestProfiler;
this.enableOpenTelemetry = enableOpenTelemetry;
}

/**
Expand Down Expand Up @@ -409,7 +412,8 @@ private ConnectionWorker createConnectionWorker(
compressorName,
clientSettings,
retrySettings,
enableRequestProfiler);
enableRequestProfiler,
enableOpenTelemetry);
connectionWorkerPool.add(connectionWorker);
log.info(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,12 @@ public Builder setEnableLatencyProfiler(boolean enableLatencyProfiler) {
return this;
}

/** Enable generation of metrics for OpenTelemetry. */
public Builder setEnableOpenTelemetry(boolean enableOpenTelemetry) {
this.schemaAwareStreamWriterBuilder.setEnableOpenTelemetry(enableOpenTelemetry);
return this;
}

/**
* Sets the default missing value interpretation value if the column is not presented in the
* missing_value_interpretations map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ private SchemaAwareStreamWriter(Builder<T> builder)
if (builder.enableRequestProfiler) {
requestProfilerHook.startPeriodicalReportFlushing();
}
streamWriterBuilder.setEnableOpenTelemetry(builder.enableOpenTelemetry);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
Expand Down Expand Up @@ -477,6 +478,7 @@ public static final class Builder<T> {
private String clientId;

private boolean enableRequestProfiler = false;
private boolean enableOpenTelemetry = false;

private static final String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
Expand Down Expand Up @@ -702,6 +704,12 @@ public Builder setEnableLatencyProfiler(boolean enableLatencyProfiler) {
return this;
}

/** Enable generation of metrics for OpenTelemetry. */
public Builder setEnableOpenTelemetry(boolean enableOpenTelemetry) {
this.enableOpenTelemetry = enableOpenTelemetry;
return this;
}

/**
* Builds SchemaAwareStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ private StreamWriter(Builder builder) throws IOException {
builder.compressorName,
clientSettings,
builder.retrySettings,
builder.enableRequestProfiler));
builder.enableRequestProfiler,
builder.enableOpenTelemetry));
} else {
if (!isDefaultStream(streamName)) {
log.warning(
Expand Down Expand Up @@ -321,7 +322,8 @@ private StreamWriter(Builder builder) throws IOException {
builder.compressorName,
client.getSettings(),
builder.retrySettings,
builder.enableRequestProfiler);
builder.enableRequestProfiler,
builder.enableOpenTelemetry);
}));
validateFetchedConnectonPool(builder);
// If the client is not from outside, then shutdown the client we created.
Expand Down Expand Up @@ -699,6 +701,7 @@ public static final class Builder {
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

private boolean enableRequestProfiler = false;
private boolean enableOpenTelemetry = false;

private RetrySettings retrySettings = null;

Expand Down Expand Up @@ -857,6 +860,12 @@ public Builder setEnableLatencyProfiler(boolean enableLatencyProfiler) {
return this;
}

/** Enable generation of metrics for OpenTelemetry. */
public Builder setEnableOpenTelemetry(boolean enableOpenTelemetry) {
this.enableOpenTelemetry = enableOpenTelemetry;
return this;
}

/**
* Enable client lib automatic retries on request level errors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ ConnectionWorkerPool createConnectionWorkerPool(
null,
clientSettings,
retrySettings,
/*enableRequestProfiler=*/ false);
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry*/ false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ public void testAppendButInflightQueueFull() throws Exception {
null,
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false);
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);

Expand Down Expand Up @@ -403,7 +404,8 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
null,
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false);
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);

Expand Down Expand Up @@ -473,7 +475,8 @@ public void testLocationMismatch() throws Exception {
null,
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false);
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
Expand Down Expand Up @@ -506,7 +509,8 @@ public void testStreamNameMismatch() throws Exception {
null,
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false);
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
Expand Down Expand Up @@ -560,7 +564,8 @@ private ConnectionWorker createConnectionWorker(
null,
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false);
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
}

private ProtoSchema createProtoSchema(String protoName) {
Expand Down Expand Up @@ -657,7 +662,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
null,
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false);
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
org.threeten.bp.Duration durationSleep = org.threeten.bp.Duration.ofSeconds(2);
testBigQueryWrite.setResponseSleep(durationSleep);

Expand Down Expand Up @@ -733,7 +739,8 @@ public void testLongTimeIdleWontFail() throws Exception {
null,
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false);
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);

long appendCount = 10;
for (int i = 0; i < appendCount * 2; i++) {
Expand Down Expand Up @@ -779,7 +786,8 @@ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, S
null,
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false);
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);

Attributes attributes = connectionWorker.getTelemetryAttributes();
String attributesTableId = attributes.get(ConnectionWorker.telemetryKeyTableId);
Expand Down Expand Up @@ -820,7 +828,8 @@ void exerciseOpenTelemetryAttributesWithTraceId(
null,
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false);
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);

Attributes attributes = connectionWorker.getTelemetryAttributes();
checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1);
Expand Down

0 comments on commit 9839d4d

Please sign in to comment.