Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately #5728

Merged
merged 6 commits into from
Oct 9, 2018

Conversation

omkreddy
Copy link
Contributor

@omkreddy omkreddy commented Oct 2, 2018

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@omkreddy
Copy link
Contributor Author

omkreddy commented Oct 2, 2018

@junrao Please take a look, whenever you get a chance

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omkreddy : Thanks for the patch. Looks good. Just one comment below.


(1 to 50).foreach(i => TestUtils.produceMessage(servers, tp.topic, i.toString))
// Verify that the new config is used for all segments
TestUtils.waitUntilTrue(() => log.logSegments.forall(_.size > 1000), "Log segment size change not applied")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the waitUntil part? The default producer acks is 1. So, by the time that TestUtils.produceMessage returns, the data should be in the log segments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Thanks for the review. Updated the PR.

@@ -181,21 +180,23 @@ class LogSegmentTest {
assertFalse(reopened.timeIndex.isFull)
assertFalse(reopened.offsetIndex.isFull)

assertFalse(reopened.shouldRoll(messagesSize = 1024,
val logConfig = LogConfig(Map().asJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be setting the relevant configs here? Previously we were passing maxSegmentMs to LogSegment and setting maxSegmentBytes to Int.MaxValue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. we should pass the configs here.

def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs
size > maxSegmentBytes - messagesSize ||
def shouldRoll(config: LogConfig, messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why are we passing the whole config when we only need two parameters? One thing we should consider is introducing a RollParams case class to make this more maintainable. Generally, it makes it harder to test and understand if we pass these config classes with lots of params when we only need a subset.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RollParams could have an apply that takes AppendInfo and LogConfig which could be used by Log.maybeRoll.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Thanks for the review. Yes, we can add a RollParams class to hold the params.
Added a commit which implements suggested approach. Pls take take a look, when you get a chance.

@omkreddy omkreddy force-pushed the KAFKA-7366-Segment-bytes branch from 8a5e64a to 0729657 Compare October 8, 2018 16:37
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omkreddy : Thanks for the updated patch. LGTM. Could you rebase?

LogConfig.SegmentBytesProp -> Int.MaxValue
).asJava)

var rollParams = RollParams(logConfig, getLogAppendInfo(100L), 1024, time.milliseconds())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use the RollParams that takes the values explicitly instead of creating LogConfig and AppendInfo in this case? It doesn't seem like it helps in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao @ijuma Thanks for the reviews. Addressed review comment and rebased the PR.

@omkreddy omkreddy force-pushed the KAFKA-7366-Segment-bytes branch 2 times, most recently from 75a8922 to 82c26a2 Compare October 9, 2018 04:02
@omkreddy omkreddy force-pushed the KAFKA-7366-Segment-bytes branch from 82c26a2 to b9643f4 Compare October 9, 2018 05:44
Copy link
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a minor commit, LGTM now.

@junrao junrao merged commit 0848b78 into apache:trunk Oct 9, 2018
@omkreddy omkreddy deleted the KAFKA-7366-Segment-bytes branch October 9, 2018 17:40
junrao pushed a commit that referenced this pull request Oct 9, 2018
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants