Skip to content

Commit

Permalink
Add some dimensions to help troubleshoot read failures. (#11989)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Apr 5, 2024
1 parent afbd230 commit 6f794c4
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package io.airbyte.workers.payload

import io.airbyte.commons.json.JsonSerde
import io.airbyte.metrics.lib.ApmTraceUtils
import io.airbyte.metrics.lib.MetricAttribute
import io.airbyte.metrics.lib.MetricClient
import io.airbyte.metrics.lib.MetricTags
import io.airbyte.metrics.lib.OssMetricsRegistry
import io.airbyte.workers.storage.StorageClient
import io.github.oshai.kotlinlogging.KotlinLogging

private val logger = KotlinLogging.logger {}

/**
* Writes and reads activity payloads to and from the configured object store.
Expand Down Expand Up @@ -78,6 +82,8 @@ class ActivityPayloadStorageClient(
return expected
}

ApmTraceUtils.addTagsToTrace(mapOf(Pair(MetricTags.URI_ID, uri.id), Pair(MetricTags.URI_VERSION, uri.version)))

val baseAttrs =
attrs +
listOf(
Expand All @@ -91,7 +97,13 @@ class ActivityPayloadStorageClient(
try {
remote = readJSON(uri, target)
} catch (e: Exception) {
metricClient.count(OssMetricsRegistry.PAYLOAD_FAILURE_READ, 1, *baseAttrs.toTypedArray())
logger.error { e }

ApmTraceUtils.addExceptionToTrace(e)
val attrsWithException =
baseAttrs + MetricAttribute(MetricTags.FAILURE_CAUSE, e.javaClass.simpleName)

metricClient.count(OssMetricsRegistry.PAYLOAD_FAILURE_READ, 1, *attrsWithException.toTypedArray())

return expected
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class MetricTags {
public static final String NOTIFICATION_CLIENT = "notification_client";
public static final String RECORD_COUNT_TYPE = "record_count_type";
public static final String RELEASE_STAGE = "release_stage";
public static final String RESET_WORKFLOW_FAILURE_CAUSE = "failure_cause";
public static final String FAILURE_CAUSE = "failure_cause";
public static final String SOURCE_ID = "source_id";
public static final String SOURCE_IMAGE = "source_image";
public static final String STATUS = "status";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void recordWorkflowCountMetric(final RecordMetricInput metricInput) {
if (metricInput.getMetricAttributes() != null) {
baseMetricAttributes.addAll(Stream.of(metricInput.getMetricAttributes()).collect(Collectors.toList()));
}
metricInput.getFailureCause().ifPresent(fc -> baseMetricAttributes.add(new MetricAttribute(MetricTags.RESET_WORKFLOW_FAILURE_CAUSE, fc.name())));
metricInput.getFailureCause().ifPresent(fc -> baseMetricAttributes.add(new MetricAttribute(MetricTags.FAILURE_CAUSE, fc.name())));
metricClient.count(metricInput.getMetricName(), 1L, baseMetricAttributes.toArray(new MetricAttribute[] {}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public StandardSyncOutput replicateV2(final ReplicationActivityInput replication
LOGGER.info("sync summary after backfill: {}", standardSyncOutput);

final StandardSyncOutput output = payloadChecker.validatePayloadSize(standardSyncOutput, metricAttributes);
final ActivityPayloadURI uri = ActivityPayloadURI.v1(connectionId, jobId, attemptNumber, StandardSyncOutput.class.getName());
final ActivityPayloadURI uri = ActivityPayloadURI.v1(connectionId, jobId, attemptNumber, "replication-output");

if (featureFlagClient.boolVariation(WriteReplicationOutputToObjectStorage.INSTANCE, new Connection(connectionId))) {
output.setUri(uri.toOpenApi());
Expand All @@ -228,6 +228,7 @@ public StandardSyncOutput replicateV2(final ReplicationActivityInput replication
final List<MetricAttribute> attrs = new ArrayList<>(Arrays.asList(metricAttributes));
attrs.add(new MetricAttribute(MetricTags.URI_ID, uri.getId()));
attrs.add(new MetricAttribute(MetricTags.URI_VERSION, uri.getVersion()));
attrs.add(new MetricAttribute(MetricTags.FAILURE_CAUSE, e.getClass().getSimpleName()));

metricClient.count(OssMetricsRegistry.PAYLOAD_FAILURE_WRITE, 1, attrs.toArray(new MetricAttribute[] {}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void testRecordingMetricCounterWithFailureCause() {
eq(1L),
eq(new MetricAttribute(MetricTags.CONNECTION_ID, String.valueOf(CONNECTION_ID))),
eq(new MetricAttribute(MetricTags.WORKSPACE_ID, String.valueOf(WORKSPACE_ID))),
eq(new MetricAttribute(MetricTags.RESET_WORKFLOW_FAILURE_CAUSE, failureCause.name())));
eq(new MetricAttribute(MetricTags.FAILURE_CAUSE, failureCause.name())));
}

@Test
Expand Down

0 comments on commit 6f794c4

Please sign in to comment.