Skip to content

Commit

Permalink
Add more metrics for uploader:
Browse files Browse the repository at this point in the history
- uploader.offset_mismatches: count how many times upload fails;

- uploader.partition_trims: count how many times partition trimming happens;
        // There was a rebalancing event and someone committed an offset lower than that of the current message.  We need to trim local files.
- uploader.partition_deletes: count how many times partition deletion happens;
        // There was a rebalancing event and someone committed an offset beyond that of the current message.  We need to delete the local file.
	// zookeeper committed offset didn't match for topic/partition
  • Loading branch information
Henry Cai committed May 18, 2020
1 parent 7ecd387 commit 93c96e5
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion src/main/java/com/pinterest/secor/uploader/Uploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,12 @@ protected void uploadFiles(TopicPartition topicPartition) throws Exception {
uploadHandles.add(mUploadManager.upload(path));
}
for (Handle<?> uploadHandle : uploadHandles) {
uploadHandle.get();
try {
uploadHandle.get();
} catch (Exception ex) {
mMetricCollector.increment("uploader.upload.failures", topicPartition.getTopic());
throw ex;
}
}
mFileRegistry.deleteTopicPartition(topicPartition);
if (mDeterministicUploadPolicyTracker != null) {
Expand All @@ -146,6 +151,7 @@ protected void uploadFiles(TopicPartition topicPartition) throws Exception {
LOG.warn("Zookeeper committed offset didn't match for topic {} partition {}: {} vs {}",
topicPartition.getTopic(), topicPartition.getTopic(), zookeeperCommittedOffsetCount,
committedOffsetCount);
mMetricCollector.increment("uploader.offset_mismatches", topicPartition.getTopic());
}
} finally {
mZookeeperConnector.unlock(lockPath);
Expand Down Expand Up @@ -282,6 +288,7 @@ protected void checkTopicPartition(TopicPartition topicPartition, boolean forceU
} else if (newOffsetCount > lastSeenOffset) { // && oldOffset < newOffset
LOG.debug("last seen offset {} is lower than committed offset count {}. Deleting files in topic {} partition {}",
lastSeenOffset, newOffsetCount,topicPartition.getTopic(), topicPartition.getPartition());
mMetricCollector.increment("uploader.partition_deletes", topicPartition.getTopic());
// There was a rebalancing event and someone committed an offset beyond that of the
// current message. We need to delete the local file.
mFileRegistry.deleteTopicPartition(topicPartition);
Expand All @@ -292,6 +299,7 @@ protected void checkTopicPartition(TopicPartition topicPartition, boolean forceU
LOG.debug("previous committed offset count {} is lower than committed offset {} is lower than or equal to last seen offset {}. " +
"Trimming files in topic {} partition {}",
oldOffsetCount, newOffsetCount, lastSeenOffset, topicPartition.getTopic(), topicPartition.getPartition());
mMetricCollector.increment("uploader.partition_trims", topicPartition.getTopic());
// There was a rebalancing event and someone committed an offset lower than that
// of the current message. We need to trim local files.
trimFiles(topicPartition, newOffsetCount);
Expand Down

0 comments on commit 93c96e5

Please sign in to comment.