diff --git a/src/main/java/com/pinterest/secor/consumer/Consumer.java b/src/main/java/com/pinterest/secor/consumer/Consumer.java index 6e97c04f0..a738534d0 100644 --- a/src/main/java/com/pinterest/secor/consumer/Consumer.java +++ b/src/main/java/com/pinterest/secor/consumer/Consumer.java @@ -18,7 +18,11 @@ */ package com.pinterest.secor.consumer; -import com.pinterest.secor.common.*; +import com.pinterest.secor.common.DeterministicUploadPolicyTracker; +import com.pinterest.secor.common.FileRegistry; +import com.pinterest.secor.common.FileTrimmer; +import com.pinterest.secor.common.OffsetTracker; +import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.message.Message; import com.pinterest.secor.message.ParsedMessage; import com.pinterest.secor.monitoring.MetricCollector; @@ -197,13 +201,10 @@ protected boolean consumeNextMessage() { if (rawMessage != null) { // Before parsing, update the offset and remove any redundant data try { - mMessageWriter.adjustOffset(rawMessage); - } catch (Exception e) { throw new RuntimeException("Failed to adjust offset.", e); } - ParsedMessage parsedMessage = null; try { Message transformedMessage = mMessageTransformer.transform(rawMessage); diff --git a/src/main/java/com/pinterest/secor/uploader/Uploader.java b/src/main/java/com/pinterest/secor/uploader/Uploader.java index 5a706b9ac..dcfbdc76d 100644 --- a/src/main/java/com/pinterest/secor/uploader/Uploader.java +++ b/src/main/java/com/pinterest/secor/uploader/Uploader.java @@ -19,7 +19,15 @@ package com.pinterest.secor.uploader; import com.google.common.base.Joiner; -import com.pinterest.secor.common.*; +import com.pinterest.secor.common.DeterministicUploadPolicyTracker; +import com.pinterest.secor.common.FileRegistry; +import com.pinterest.secor.common.FileTrimmer; +import com.pinterest.secor.common.LogFilePath; +import com.pinterest.secor.common.OffsetTracker; +import com.pinterest.secor.common.SecorConfig; +import com.pinterest.secor.common.SecorConstants; +import com.pinterest.secor.common.TopicPartition; +import com.pinterest.secor.common.ZookeeperConnector; import com.pinterest.secor.monitoring.MetricCollector; import com.pinterest.secor.reader.MessageReader; import org.apache.commons.lang.StringUtils; diff --git a/src/main/java/com/pinterest/secor/writer/MessageWriter.java b/src/main/java/com/pinterest/secor/writer/MessageWriter.java index e2b9e037e..8c0904b4b 100644 --- a/src/main/java/com/pinterest/secor/writer/MessageWriter.java +++ b/src/main/java/com/pinterest/secor/writer/MessageWriter.java @@ -18,7 +18,13 @@ */ package com.pinterest.secor.writer; -import com.pinterest.secor.common.*; +import com.pinterest.secor.common.DeterministicUploadPolicyTracker; +import com.pinterest.secor.common.FileRegistry; +import com.pinterest.secor.common.FileTrimmer; +import com.pinterest.secor.common.LogFilePath; +import com.pinterest.secor.common.OffsetTracker; +import com.pinterest.secor.common.SecorConfig; +import com.pinterest.secor.common.TopicPartition; import com.pinterest.secor.io.FileWriter; import com.pinterest.secor.io.KeyValue; import com.pinterest.secor.message.Message; @@ -87,7 +93,6 @@ public void adjustOffset(Message message) throws Exception { // rewrite message from committed offset to message offset in order to remove duplicates mfileTrimmer.trimFilesWithOffsetsRange(topicPartition, mOffsetTracker.getAdjustedCommittedOffsetCount(topicPartition), message.getOffset()-1); - if (mDeterministicUploadPolicyTracker != null) { mDeterministicUploadPolicyTracker.reset(topicPartition); } diff --git a/src/test/java/com/pinterest/secor/writer/MessageWriterTest.java b/src/test/java/com/pinterest/secor/writer/MessageWriterTest.java index ffa6c8097..2d8c7a005 100644 --- a/src/test/java/com/pinterest/secor/writer/MessageWriterTest.java +++ b/src/test/java/com/pinterest/secor/writer/MessageWriterTest.java @@ -1,6 +1,12 @@ package com.pinterest.secor.writer; -import com.pinterest.secor.common.*; +import com.pinterest.secor.common.FileRegistry; +import com.pinterest.secor.common.FileTrimmer; +import com.pinterest.secor.common.LogFilePath; +import com.pinterest.secor.common.OffsetTracker; +import com.pinterest.secor.common.SecorConfig; +import com.pinterest.secor.common.SecorConstants; +import com.pinterest.secor.common.TopicPartition; import com.pinterest.secor.message.Message; import com.pinterest.secor.util.FileUtil; import com.pinterest.secor.util.IdUtil;