Skip to content

Commit

Permalink
Merge pull request #1228 from varun06/typo-fixes
Browse files Browse the repository at this point in the history
fixed some typo and added some comments
  • Loading branch information
bai authored Nov 28, 2018
2 parents 41db94b + 8cc8678 commit 96e43a8
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 5 deletions.
10 changes: 5 additions & 5 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ConsumerGroup interface {
// to allow the user to perform any final tasks before a rebalance.
// 6. Finally, marked offsets are committed one last time before claims are released.
//
// Please note, that once a relance is triggered, sessions must be completed within
// Please note, that once a rebalance is triggered, sessions must be completed within
// Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
Expand Down Expand Up @@ -267,7 +267,7 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
}
}

return newConsumerGroupSession(c, ctx, claims, join.MemberId, join.GenerationId, handler)
return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}

func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
Expand Down Expand Up @@ -456,7 +456,7 @@ type consumerGroupSession struct {
hbDying, hbDead chan none
}

func newConsumerGroupSession(parent *consumerGroup, ctx context.Context, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
// init offset manager
offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
if err != nil {
Expand Down Expand Up @@ -595,7 +595,7 @@ func (s *consumerGroupSession) consume(topic string, partition int32) {
s.parent.handleError(err, topic, partition)
}

// ensure consumer is clased & drained
// ensure consumer is closed & drained
claim.AsyncClose()
for _, err := range claim.waitClosed() {
s.parent.handleError(err, topic, partition)
Expand Down Expand Up @@ -691,7 +691,7 @@ type ConsumerGroupHandler interface {
// Setup is run at the beginning of a new session, before ConsumeClaim.
Setup(ConsumerGroupSession) error

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exites
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
Cleanup(ConsumerGroupSession) error

Expand Down
2 changes: 2 additions & 0 deletions consumer_group_members.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package sarama

//ConsumerGroupMemberMetadata holds the metadata for consumer group
type ConsumerGroupMemberMetadata struct {
Version int16
Topics []string
Expand Down Expand Up @@ -36,6 +37,7 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
return nil
}

//ConsumerGroupMemberAssignment holds the member assignment for a consume group
type ConsumerGroupMemberAssignment struct {
Version int16
Topics map[string][]int32
Expand Down
1 change: 1 addition & 0 deletions consumer_metadata_request.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package sarama

//ConsumerMetadataRequest is used for metadata requests
type ConsumerMetadataRequest struct {
ConsumerGroup string
}
Expand Down
1 change: 1 addition & 0 deletions consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strconv"
)

//ConsumerMetadataResponse holds the reponse for a consumer gorup meta data request
type ConsumerMetadataResponse struct {
Err KError
Coordinator *Broker
Expand Down

0 comments on commit 96e43a8

Please sign in to comment.