From 6f794c46fd67c94af5e67eb655721bbe1c2337c8 Mon Sep 17 00:00:00 2001 From: Ryan Br Date: Fri, 5 Apr 2024 11:44:52 -0700 Subject: [PATCH] Add some dimensions to help troubleshoot read failures. (#11989) --- .../payload/ActivityPayloadStorageClient.kt | 14 +++++++++++++- .../java/io/airbyte/metrics/lib/MetricTags.java | 2 +- .../activities/RecordMetricActivityImpl.java | 2 +- .../temporal/sync/ReplicationActivityImpl.java | 3 ++- .../activities/RecordMetricActivityImplTest.java | 2 +- 5 files changed, 18 insertions(+), 5 deletions(-) diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/payload/ActivityPayloadStorageClient.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/payload/ActivityPayloadStorageClient.kt index b165c479728..41b3e8e21d6 100644 --- a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/payload/ActivityPayloadStorageClient.kt +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/payload/ActivityPayloadStorageClient.kt @@ -1,11 +1,15 @@ package io.airbyte.workers.payload import io.airbyte.commons.json.JsonSerde +import io.airbyte.metrics.lib.ApmTraceUtils import io.airbyte.metrics.lib.MetricAttribute import io.airbyte.metrics.lib.MetricClient import io.airbyte.metrics.lib.MetricTags import io.airbyte.metrics.lib.OssMetricsRegistry import io.airbyte.workers.storage.StorageClient +import io.github.oshai.kotlinlogging.KotlinLogging + +private val logger = KotlinLogging.logger {} /** * Writes and reads activity payloads to and from the configured object store. @@ -78,6 +82,8 @@ class ActivityPayloadStorageClient( return expected } + ApmTraceUtils.addTagsToTrace(mapOf(Pair(MetricTags.URI_ID, uri.id), Pair(MetricTags.URI_VERSION, uri.version))) + val baseAttrs = attrs + listOf( @@ -91,7 +97,13 @@ class ActivityPayloadStorageClient( try { remote = readJSON(uri, target) } catch (e: Exception) { - metricClient.count(OssMetricsRegistry.PAYLOAD_FAILURE_READ, 1, *baseAttrs.toTypedArray()) + logger.error { e } + + ApmTraceUtils.addExceptionToTrace(e) + val attrsWithException = + baseAttrs + MetricAttribute(MetricTags.FAILURE_CAUSE, e.javaClass.simpleName) + + metricClient.count(OssMetricsRegistry.PAYLOAD_FAILURE_READ, 1, *attrsWithException.toTypedArray()) return expected } diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java index 5c5098bb63b..f748405db69 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java @@ -46,7 +46,7 @@ public class MetricTags { public static final String NOTIFICATION_CLIENT = "notification_client"; public static final String RECORD_COUNT_TYPE = "record_count_type"; public static final String RELEASE_STAGE = "release_stage"; - public static final String RESET_WORKFLOW_FAILURE_CAUSE = "failure_cause"; + public static final String FAILURE_CAUSE = "failure_cause"; public static final String SOURCE_ID = "source_id"; public static final String SOURCE_IMAGE = "source_image"; public static final String STATUS = "status"; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java index 648afada87c..1ec99ae317d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java @@ -64,7 +64,7 @@ public void recordWorkflowCountMetric(final RecordMetricInput metricInput) { if (metricInput.getMetricAttributes() != null) { baseMetricAttributes.addAll(Stream.of(metricInput.getMetricAttributes()).collect(Collectors.toList())); } - metricInput.getFailureCause().ifPresent(fc -> baseMetricAttributes.add(new MetricAttribute(MetricTags.RESET_WORKFLOW_FAILURE_CAUSE, fc.name()))); + metricInput.getFailureCause().ifPresent(fc -> baseMetricAttributes.add(new MetricAttribute(MetricTags.FAILURE_CAUSE, fc.name()))); metricClient.count(metricInput.getMetricName(), 1L, baseMetricAttributes.toArray(new MetricAttribute[] {})); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 5686d085016..7ca994b0c8e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -217,7 +217,7 @@ public StandardSyncOutput replicateV2(final ReplicationActivityInput replication LOGGER.info("sync summary after backfill: {}", standardSyncOutput); final StandardSyncOutput output = payloadChecker.validatePayloadSize(standardSyncOutput, metricAttributes); - final ActivityPayloadURI uri = ActivityPayloadURI.v1(connectionId, jobId, attemptNumber, StandardSyncOutput.class.getName()); + final ActivityPayloadURI uri = ActivityPayloadURI.v1(connectionId, jobId, attemptNumber, "replication-output"); if (featureFlagClient.boolVariation(WriteReplicationOutputToObjectStorage.INSTANCE, new Connection(connectionId))) { output.setUri(uri.toOpenApi()); @@ -228,6 +228,7 @@ public StandardSyncOutput replicateV2(final ReplicationActivityInput replication final List attrs = new ArrayList<>(Arrays.asList(metricAttributes)); attrs.add(new MetricAttribute(MetricTags.URI_ID, uri.getId())); attrs.add(new MetricAttribute(MetricTags.URI_VERSION, uri.getVersion())); + attrs.add(new MetricAttribute(MetricTags.FAILURE_CAUSE, e.getClass().getSimpleName())); metricClient.count(OssMetricsRegistry.PAYLOAD_FAILURE_WRITE, 1, attrs.toArray(new MetricAttribute[] {})); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java index 933ef03d615..a04ae870122 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java @@ -95,7 +95,7 @@ void testRecordingMetricCounterWithFailureCause() { eq(1L), eq(new MetricAttribute(MetricTags.CONNECTION_ID, String.valueOf(CONNECTION_ID))), eq(new MetricAttribute(MetricTags.WORKSPACE_ID, String.valueOf(WORKSPACE_ID))), - eq(new MetricAttribute(MetricTags.RESET_WORKFLOW_FAILURE_CAUSE, failureCause.name()))); + eq(new MetricAttribute(MetricTags.FAILURE_CAUSE, failureCause.name()))); } @Test