Skip to content

Commit

Permalink
azure-eventhub v2: update logging for consistency (and other minor fi…
Browse files Browse the repository at this point in the history
…xes) (#40546)

* Always use partition_id for consistency

The input now constantly logs the partition ID using the `partition_id`
key instead of mixing `partition` and `partition_id`.

* Log 'no events received' per partition ID

The partition consumer now logs the message 'no events received' when
there are no events for the partition.

* Create one eventHubMetadata for each event

Avoid eventHubMetadata values from previous events leaking into the
current one (for example, partition_key).
  • Loading branch information
zmoog authored Aug 23, 2024
1 parent 973af49 commit ee678f1
Showing 1 changed file with 29 additions and 20 deletions.
49 changes: 29 additions & 20 deletions x-pack/filebeat/input/azureeventhub/v2_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (in *eventHubInputV2) workersLoop(ctx context.Context, processor *azeventhu
go func() {
in.log.Infow(
"starting a partition worker",
"partition", partitionID,
"partition_id", partitionID,
)

if err := in.processEventsForPartition(ctx, processorPartitionClient); err != nil {
Expand All @@ -397,13 +397,13 @@ func (in *eventHubInputV2) workersLoop(ctx context.Context, processor *azeventhu
in.log.Infow(
"stopping processing events for partition",
"reason", err,
"partition", partitionID,
"partition_id", partitionID,
)
}

in.log.Infow(
"partition worker exited",
"partition", partitionID,
"partition_id", partitionID,
)
}()
}
Expand All @@ -428,7 +428,10 @@ func (in *eventHubInputV2) processEventsForPartition(ctx context.Context, partit
// 3/3 [END] Do cleanup here, like shutting down database clients
// or other resources used for processing this partition.
shutdownPartitionResources(ctx, partitionClient, pipelineClient)
in.log.Debugw("partition resources cleaned up", "partition", partitionID)
in.log.Debugw(
"partition resources cleaned up",
"partition_id", partitionID,
)
}()

// 2/3 [CONTINUOUS] Receive events, checkpointing as needed using UpdateCheckpoint.
Expand All @@ -444,7 +447,7 @@ func (in *eventHubInputV2) processEventsForPartition(ctx context.Context, partit
if errors.As(err, &eventHubError) && eventHubError.Code == azeventhubs.ErrorCodeOwnershipLost {
in.log.Infow(
"ownership lost for partition, stopping processing",
"partition", partitionID,
"partition_id", partitionID,
)

return nil
Expand All @@ -454,6 +457,10 @@ func (in *eventHubInputV2) processEventsForPartition(ctx context.Context, partit
}

if len(events) == 0 {
in.log.Debugw(
"no events received",
"partition_id", partitionID,
)
continue
}

Expand All @@ -467,30 +474,32 @@ func (in *eventHubInputV2) processEventsForPartition(ctx context.Context, partit
// processReceivedEvents
func (in *eventHubInputV2) processReceivedEvents(receivedEvents []*azeventhubs.ReceivedEventData, partitionID string, pipelineClient beat.Client) error {
processingStartTime := time.Now()
eventHubMetadata := mapstr.M{
"partition_id": partitionID,
"eventhub": in.config.EventHubName,
"consumer_group": in.config.ConsumerGroup,
}

for _, receivedEventData := range receivedEvents {
eventHubMetadata := mapstr.M{
"partition_id": partitionID,
"eventhub": in.config.EventHubName,
"consumer_group": in.config.ConsumerGroup,
}

// Update input metrics.
in.metrics.receivedMessages.Inc()
in.metrics.receivedBytes.Add(uint64(len(receivedEventData.Body)))

_, _ = eventHubMetadata.Put("offset", receivedEventData.Offset)
_, _ = eventHubMetadata.Put("sequence_number", receivedEventData.SequenceNumber)
_, _ = eventHubMetadata.Put("enqueued_time", receivedEventData.EnqueuedTime)

// The partition key is optional.
if receivedEventData.PartitionKey != nil {
_, _ = eventHubMetadata.Put("partition_key", *receivedEventData.PartitionKey)
}

// A single event can contain multiple records.
// We create a new event for each record.
records := in.messageDecoder.Decode(receivedEventData.Body)

for _, record := range records {
_, _ = eventHubMetadata.Put("offset", receivedEventData.Offset)
_, _ = eventHubMetadata.Put("sequence_number", receivedEventData.SequenceNumber)
_, _ = eventHubMetadata.Put("enqueued_time", receivedEventData.EnqueuedTime)

// The partition key is optional.
if receivedEventData.PartitionKey != nil {
_, _ = eventHubMetadata.Put("partition_key", *receivedEventData.PartitionKey)
}

event := beat.Event{
// this is the default value for the @timestamp field; usually the ingest
Expand Down Expand Up @@ -537,7 +546,7 @@ func initializePartitionResources(ctx context.Context, partitionClient *azeventh
if !ok {
log.Errorw(
"error updating checkpoint",
"partition", partitionClient.PartitionID(),
"partition_id", partitionClient.PartitionID(),
"acked", acked,
"error", "invalid data type",
"type", fmt.Sprintf("%T", data),
Expand All @@ -555,7 +564,7 @@ func initializePartitionResources(ctx context.Context, partitionClient *azeventh

log.Debugw(
"checkpoint updated",
"partition", partitionClient.PartitionID(),
"partition_id", partitionClient.PartitionID(),
"acked", acked,
"sequence_number", receivedEventData.SequenceNumber,
"offset", receivedEventData.Offset,
Expand Down

0 comments on commit ee678f1

Please sign in to comment.