Skip to content

Commit

Permalink
Fix partition selection issue in Azure log processing
Browse files Browse the repository at this point in the history
Because of the way the goroutine functions close over the loop
variable, every goroutine would likely process the last partition.
Raised by go vet.

Pass the variable as a parameter of the functions.
  • Loading branch information
msakrejda committed Jul 31, 2023
1 parent c1233a3 commit 476c261
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions input/system/azure/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,24 +135,24 @@ func runEventHubHandlers(ctx context.Context, partitionIDs []string, logger *uti

for _, partitionID := range partitionIDs {
wg.Add(1)
go func() {
go func(partID string) {
defer wg.Done()

partitionClient, err := consumerClient.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{
partitionClient, err := consumerClient.NewPartitionClient(partID, &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
Earliest: to.Ptr(true),
},
})
if err != nil {
logger.PrintError("Failed to set up Azure Event Hub partition client for partition %s: %s", partitionID, err)
logger.PrintError("Failed to set up Azure Event Hub partition client for partition %s: %s", partID, err)
}
defer partitionClient.Close(ctx)

for {
events, err := partitionClient.ReceiveEvents(ctx, 1, nil)
if err != nil {
if err != context.Canceled {
logger.PrintError("Failed to receive events from Azure Event Hub for partition %s: %s", partitionID, err)
logger.PrintError("Failed to receive events from Azure Event Hub for partition %s: %s", partID, err)
}
break
}
Expand All @@ -161,7 +161,7 @@ func runEventHubHandlers(ctx context.Context, partitionIDs []string, logger *uti
handler(ctx, event)
}
}
}()
}(partitionID)
}

wg.Wait()
Expand Down

0 comments on commit 476c261

Please sign in to comment.