Skip to content

Commit

Permalink
new event consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
latolukasz committed Sep 1, 2021
1 parent fd90674 commit d289e87
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ func (r *eventsConsumer) consume(ctx context.Context, name string, count int, ha
case <-done:
return true
default:
r.digest(ctx, attributes, done)
if r.digest(ctx, attributes) {
return true
}
}
}
}
Expand All @@ -302,11 +304,12 @@ type consumeAttributes struct {
Streams []string
}

func (r *eventsConsumer) digest(ctx context.Context, attributes *consumeAttributes, done chan bool) {
func (r *eventsConsumer) digest(ctx context.Context, attributes *consumeAttributes) (stop bool) {
finished := r.digestKeys(ctx, attributes)
if !r.loop && finished {
close(done)
return true
}
return false
}

func (r *eventsConsumer) digestKeys(ctx context.Context, attributes *consumeAttributes) (finished bool) {
Expand Down

0 comments on commit d289e87

Please sign in to comment.