Skip to content

Commit

Permalink
Allow the Consumer to disable auto-commit offsets (#1164)
Browse files Browse the repository at this point in the history
* #1158 Allow the Consumer to disable auto-commit offsets

* Ignoring Close if commit offsets are disabled. Avoit starting mainloop if commit offsets are disabled

* Renamed Consumer.Offsets.AutoCommitEnable to Consumer.Offsets.AutoCommit.Enable. Renamed Consumer.Offsets.CommitInteval to Consumer.Offsets.AutoCommit.Inteval

* started on unittest for Consumer.Offsets.AutoCommit.Enable

* Moved check for Consumer.Offsets.AutoCommit.Enable to offsetManager.flushToBroker to keep mainLoop

* moved ticker back to struct

* Fixed TestNewOffsetManagerOffsetsAutoCommit fails because of low timeout
  • Loading branch information
kjelle authored and varun06 committed Nov 22, 2019
1 parent bb74e49 commit 72a629d
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 10 deletions.
16 changes: 12 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,15 @@ type Config struct {
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
// How frequently to commit updated offsets. Defaults to 1s.
CommitInterval time.Duration
AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool

// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration
}

// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
Expand Down Expand Up @@ -423,7 +430,8 @@ func NewConfig() *Config {
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.AutoCommit.Enable = true
c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3

Expand Down Expand Up @@ -650,7 +658,7 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.CommitInterval <= 0:
case c.Consumer.Offsets.AutoCommit.Interval <= 0:
return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
Expand Down
7 changes: 6 additions & 1 deletion offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client
client: client,
conf: conf,
group: group,
ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval),
poms: make(map[string]map[int32]*partitionOffsetManager),

memberID: memberID,
Expand Down Expand Up @@ -233,7 +233,12 @@ func (om *offsetManager) mainLoop() {
}
}

// flushToBroker is ignored if auto-commit offsets is disabled
func (om *offsetManager) flushToBroker() {
if !om.conf.Consumer.Offsets.AutoCommit.Enable {
return
}

req := om.constructRequest()
if req == nil {
return
Expand Down
85 changes: 80 additions & 5 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
)

func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
backoffFunc func(retries, maxRetries int) time.Duration) (om OffsetManager,
backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {

config := NewConfig()
config.Metadata.Retry.Max = 1
if backoffFunc != nil {
config.Metadata.Retry.BackoffFunc = backoffFunc
}
config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Millisecond
config.Version = V0_9_0_0
if retention > 0 {
config.Consumer.Offsets.Retention = retention
Expand Down Expand Up @@ -52,7 +51,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,

func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
testClient Client, broker, coordinator *MockBroker) {
return initOffsetManagerWithBackoffFunc(t, retention, nil)
return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig())
}

func initPartitionOffsetManager(t *testing.T, om OffsetManager,
Expand Down Expand Up @@ -97,6 +96,82 @@ func TestNewOffsetManager(t *testing.T) {
}
}

var offsetsautocommitTestTable = []struct {
name string
set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
enable bool
}{
{
"AutoCommit (default)",
false, // use default
true,
},
{
"AutoCommit Enabled",
true,
true,
},
{
"AutoCommit Disabled",
true,
false,
},
}

func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
// Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable`
for _, tt := range offsetsautocommitTestTable {
t.Run(tt.name, func(t *testing.T) {

config := NewConfig()
if tt.set {
config.Consumer.Offsets.AutoCommit.Enable = tt.enable
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

// Wait long enough for the test not to fail..
timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval

called := make(chan none)

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
handler := func(req *request) (res encoder) {
close(called)
return ocResponse
}
coordinator.setHandler(handler)

// Should force an offset commit, if auto-commit is enabled.
expected := int64(1)
pom.ResetOffset(expected, "modified_meta")
_, _ = pom.NextOffset()

select {
case <-called:
// OffsetManager called on the wire.
if !config.Consumer.Offsets.AutoCommit.Enable {
t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name)
}
case <-time.After(timeout):
// Timeout waiting for OffsetManager to call on the wire.
if config.Consumer.Offsets.AutoCommit.Enable {
t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
}
}

broker.Close()
coordinator.Close()

// !! om must be closed before the pom so pom.release() is called before pom.Close()
safeClose(t, om)
safeClose(t, pom)
safeClose(t, testClient)
})
}
}

// Test recovery from ErrNotCoordinatorForConsumer
// on first fetchInitialOffset call
func TestOffsetManagerFetchInitialFail(t *testing.T) {
Expand Down Expand Up @@ -148,7 +223,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
atomic.AddInt32(&retryCount, 1)
return 0
}
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff)
om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig())

// Error on first fetchInitialOffset call
responseBlock := OffsetFetchResponseBlock{
Expand Down

0 comments on commit 72a629d

Please sign in to comment.