From 93c96e5a428d875d08493cbd9cc0f7bf76194c11 Mon Sep 17 00:00:00 2001 From: Henry Cai Date: Sun, 17 May 2020 23:20:23 -0700 Subject: [PATCH] Add more metrics for uploader: - uploader.offset_mismatches: count how many times upload fails; - uploader.partition_trims: count how many times partition trimming happens; // There was a rebalancing event and someone committed an offset lower than that of the current message. We need to trim local files. - uploader.partition_deletes: count how many times partition deletion happens; // There was a rebalancing event and someone committed an offset beyond that of the current message. We need to delete the local file. // zookeeper committed offset didn't match for topic/partition --- .../java/com/pinterest/secor/uploader/Uploader.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/pinterest/secor/uploader/Uploader.java b/src/main/java/com/pinterest/secor/uploader/Uploader.java index 8ec9fa6cb..910f51d2d 100644 --- a/src/main/java/com/pinterest/secor/uploader/Uploader.java +++ b/src/main/java/com/pinterest/secor/uploader/Uploader.java @@ -130,7 +130,12 @@ protected void uploadFiles(TopicPartition topicPartition) throws Exception { uploadHandles.add(mUploadManager.upload(path)); } for (Handle uploadHandle : uploadHandles) { - uploadHandle.get(); + try { + uploadHandle.get(); + } catch (Exception ex) { + mMetricCollector.increment("uploader.upload.failures", topicPartition.getTopic()); + throw ex; + } } mFileRegistry.deleteTopicPartition(topicPartition); if (mDeterministicUploadPolicyTracker != null) { @@ -146,6 +151,7 @@ protected void uploadFiles(TopicPartition topicPartition) throws Exception { LOG.warn("Zookeeper committed offset didn't match for topic {} partition {}: {} vs {}", topicPartition.getTopic(), topicPartition.getTopic(), zookeeperCommittedOffsetCount, committedOffsetCount); + mMetricCollector.increment("uploader.offset_mismatches", topicPartition.getTopic()); } } finally { mZookeeperConnector.unlock(lockPath); @@ -282,6 +288,7 @@ protected void checkTopicPartition(TopicPartition topicPartition, boolean forceU } else if (newOffsetCount > lastSeenOffset) { // && oldOffset < newOffset LOG.debug("last seen offset {} is lower than committed offset count {}. Deleting files in topic {} partition {}", lastSeenOffset, newOffsetCount,topicPartition.getTopic(), topicPartition.getPartition()); + mMetricCollector.increment("uploader.partition_deletes", topicPartition.getTopic()); // There was a rebalancing event and someone committed an offset beyond that of the // current message. We need to delete the local file. mFileRegistry.deleteTopicPartition(topicPartition); @@ -292,6 +299,7 @@ protected void checkTopicPartition(TopicPartition topicPartition, boolean forceU LOG.debug("previous committed offset count {} is lower than committed offset {} is lower than or equal to last seen offset {}. " + "Trimming files in topic {} partition {}", oldOffsetCount, newOffsetCount, lastSeenOffset, topicPartition.getTopic(), topicPartition.getPartition()); + mMetricCollector.increment("uploader.partition_trims", topicPartition.getTopic()); // There was a rebalancing event and someone committed an offset lower than that // of the current message. We need to trim local files. trimFiles(topicPartition, newOffsetCount);